You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/04/18 17:06:35 UTC

[5/6] activemq-artemis git commit: ARTEMIS-1121 Improving expiry scanner

ARTEMIS-1121 Improving expiry scanner

https://issues.apache.org/jira/browse/ARTEMIS-1121


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1a397724
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1a397724
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1a397724

Branch: refs/heads/master
Commit: 1a397724898259d5451cd441f5f6b301d2c8571b
Parents: 31d78ed
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 18 10:29:29 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 18 11:49:25 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       |   3 +-
 .../core/paging/impl/PagingStoreImpl.java       |   2 +-
 .../core/server/ActiveMQServerLogger.java       |   6 +-
 .../artemis/core/server/impl/QueueImpl.java     |  46 ++++-
 tests/smoke-tests/pom.xml                       |  17 ++
 .../main/resources/servers/expire/broker.xml    | 184 +++++++++++++++++++
 .../tests/smoke/expire/TestSimpleExpire.java    |  92 ++++++++++
 7 files changed, 338 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index d627fd5..d447ecd 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -633,7 +633,8 @@ public class AMQPMessage extends RefCountMessage {
 
    private synchronized void checkBuffer() {
       if (!bufferValid) {
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500);
+         int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
          try {
             getProtonMessage().encode(new NettyWritable(buffer));
             byte[] bytes = new byte[buffer.writerIndex()];

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 03f53c6..6486ec9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -683,7 +683,7 @@ public class PagingStoreImpl implements PagingStore {
                   if (pagingManager.isDiskFull()) {
                      ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
                   } else {
-                     ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
+                     ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
                   }
                   blocking.set(true);
                }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 9aaba7c..48507a0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -944,7 +944,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void errorExpiringReferencesNoBindings(SimpleString expiryAddress);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222147, value = "Message has expired. No expiry queue configured for queue {0} so dropping it", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 222147, value = "Messages are being expired on queue{0}. However there is no expiry queue configured, hence messages will be dropped.", format = Message.Format.MESSAGE_FORMAT)
    void errorExpiringReferencesNoQueue(SimpleString name);
 
    @LogMessage(level = Logger.Level.WARN)
@@ -1104,8 +1104,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void missingClusterConfigForScaleDown(String scaleDownCluster);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes: {2}", format = Message.Format.MESSAGE_FORMAT)
-   void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize);
+   @Message(id = 222183, value = "Blocking message production on address ''{0}''; size is currently: {1} bytes; max-size-bytes on address: {2}, global-max-size is {3}", format = Message.Format.MESSAGE_FORMAT)
+   void blockingMessageProduction(SimpleString addressName, long currentSize, long maxSize, long globalMaxSize);
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222184,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dc4b090..a62ae79 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -146,6 +146,8 @@ public class QueueImpl implements Queue {
 
    private final LinkedListIterator<PagedReference> pageIterator;
 
+   private volatile boolean printErrorExpiring = false;
+
    // Messages will first enter intermediateMessageReferences
    // Before they are added to messageReferences
    // This is to avoid locking the queue on the producer
@@ -1567,27 +1569,52 @@ public class QueueImpl implements Queue {
             if (queueDestroyed) {
                return;
             }
+            logger.debug("Scanning for expires on " + QueueImpl.this.getName());
 
             LinkedListIterator<MessageReference> iter = iterator();
 
+            boolean expired = false;
+            boolean hasElements = false;
+
+            int elementsExpired = 0;
             try {
-               boolean expired = false;
-               boolean hasElements = false;
+               Transaction tx = null;
+
                while (postOffice.isStarted() && iter.hasNext()) {
                   hasElements = true;
                   MessageReference ref = iter.next();
                   try {
                      if (ref.getMessage().isExpired()) {
+                        if (tx == null) {
+                           tx = new TransactionImpl(storageManager);
+                        }
                         incDelivering();
                         expired = true;
-                        expire(ref);
+                        expire(tx, ref);
                         iter.remove();
                         refRemoved(ref);
+
+                        if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
+                           logger.debug("Breaking loop of expiring");
+                           scannerRunning.incrementAndGet();
+                           getExecutor().execute(this);
+                           break;
+                        }
                      }
+
                   } catch (Exception e) {
                      ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
                   }
+               }
 
+               logger.debug("Expired " + elementsExpired + " references");
+
+               try {
+                  if (tx != null) {
+                     tx.commit();
+                  }
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
                }
 
                // If empty we need to schedule depaging to make sure we would depage expired messages as well
@@ -1600,6 +1627,8 @@ public class QueueImpl implements Queue {
                } catch (Throwable ignored) {
                }
                scannerRunning.decrementAndGet();
+               logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");
+
             }
          }
       }
@@ -1912,7 +1941,6 @@ public class QueueImpl implements Queue {
       return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
    }
 
-
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
       messageReferences.addTail(ref, getPriority(ref));
@@ -2519,7 +2547,11 @@ public class QueueImpl implements Queue {
             move(expiryAddress, tx, ref, true, true);
          }
       } else {
-         ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
+         if (!printErrorExpiring) {
+            printErrorExpiring = true;
+            // print this only once
+            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
+         }
 
          acknowledge(tx, ref);
       }
@@ -3015,7 +3047,7 @@ public class QueueImpl implements Queue {
             if (messagesIterator != null && messagesIterator.hasNext()) {
                MessageReference msg = messagesIterator.next();
                if (msg.isPaged()) {
-                  previouslyBrowsed.add(((PagedReference)msg).getPosition());
+                  previouslyBrowsed.add(((PagedReference) msg).getPosition());
                }
                return msg;
             } else {
@@ -3156,7 +3188,7 @@ public class QueueImpl implements Queue {
          if (consumersSet.size() == 0) {
             logger.debug("There are no consumers, no need to check slow consumer's rate");
             return;
-         } else if (queueRate  < (threshold * consumersSet.size())) {
+         } else if (queueRate < (threshold * consumersSet.size())) {
             if (logger.isDebugEnabled()) {
                logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 4b06386..4734706 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -172,6 +172,23 @@
                      <configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
                   </configuration>
                </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-expire</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <configuration>${basedir}/target/classes/servers/expire</configuration>
+                     <javaOptions>-Dartemis.debug.paging.interval=1</javaOptions>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>admin</user>
+                     <password>admin</password>
+                     <instance>${basedir}/target/expire</instance>
+                  </configuration>
+               </execution>
+
             </executions>
             <dependencies>
                <dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/main/resources/servers/expire/broker.xml b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
new file mode 100644
index 0000000..a4176f8
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/expire/broker.xml
@@ -0,0 +1,184 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <!--
+        You can specify the NIC you want to use to verify if the network
+         <network-check-NIC>theNickName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in memory
+
+            The system will use half of the available memory (-Xmx) by default for the global-max-size.
+            You may specify a different value here if you need to customize it to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+
+         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
+         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
+
+         <!-- STOMP Acceptor. -->
+         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
+
+         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
+         <acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
+
+         <!-- MQTT Acceptor -->
+         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
+
+      </acceptors>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+
+      </addresses>
+
+   </core>
+</configuration>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1a397724/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
----------------------------------------------------------------------
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
new file mode 100644
index 0000000..40bd6fc
--- /dev/null
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/expire/TestSimpleExpire.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.expire;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSimpleExpire extends SmokeTestBase {
+
+   public static final String SERVER_NAME_0 = "expire";
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      disableCheckThread();
+      startServer(SERVER_NAME_0, 0, 30000);
+   }
+
+   @Test
+   public void testSendExpire() throws Exception {
+      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+      Queue queue = session.createQueue("q0");
+      MessageProducer producer = session.createProducer(queue);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      producer.setTimeToLive(1000);
+      for (int i = 0; i < 20000; i++) {
+         producer.send(session.createTextMessage("expired"));
+         if (i % 5000 == 0) {
+            session.commit();
+            System.out.println("Sent " + i + " + messages");
+         }
+
+      }
+
+      session.commit();
+
+      Thread.sleep(5000);
+      producer.setTimeToLive(0);
+      for (int i = 0; i < 500; i++) {
+         producer.send(session.createTextMessage("ok"));
+
+      }
+      session.commit();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+
+      for (int i = 0; i < 500; i++) {
+         TextMessage txt = (TextMessage) consumer.receive(10000);
+         Assert.assertNotNull(txt);
+         Assert.assertEquals("ok", txt.getText());
+      }
+
+      session.commit();
+
+
+
+   }
+
+}