You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:30 UTC

[03/13] git commit: Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions

Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bcb60a48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bcb60a48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bcb60a48

Branch: refs/heads/trunk
Commit: bcb60a482cdc1cd2d2de9b4dba6a38ef831b2fa4
Parents: ba519d8
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Wed May 7 19:05:36 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:18 2014 +0200

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Topic.java    |  14 ++-
 ...tainedMessageSubscriptionRecoveryPolicy.java | 107 +++++++++++++++++++
 .../transport/mqtt/MQTTProtocolConverter.java   |  71 +++++-------
 .../activemq/transport/mqtt/MQTTTest.java       |  86 ++++++++++++---
 4 files changed, 215 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 277ce05..4744af8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -33,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
-import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
@@ -91,7 +91,7 @@ public class Topic extends BaseDestination implements Task {
             subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
             setAlwaysRetroactive(true);
         } else {
-            subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+            subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
         }
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
@@ -675,8 +675,14 @@ public class Topic extends BaseDestination implements Task {
         return subscriptionRecoveryPolicy;
     }
 
-    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
-        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
+    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
+        if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
+            // allow users to combine retained message policy with other ActiveMQ policies
+            RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
+            policy.setWrapped(recoveryPolicy);
+        } else {
+            this.subscriptionRecoveryPolicy = recoveryPolicy;
+        }
     }
 
     // Implementation methods

http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
new file mode 100644
index 0000000..d350a5f
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
+
+/**
+ * This implementation of {@link org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy} will only keep the
+ * last non-zero length message with the {@link org.apache.activemq.command.ActiveMQMessage}.RETAIN_PROPERTY.
+ *
+ * @org.apache.xbean.XBean
+ *
+ */
+public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
+
+    public static final String RETAIN_PROPERTY = "ActiveMQRetain";
+    public static final String RETAINED_PROPERTY = "ActiveMQRetained";
+    private volatile MessageReference retainedMessage;
+    private SubscriptionRecoveryPolicy wrapped;
+
+    public RetainedMessageSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    public boolean add(ConnectionContext context, MessageReference node) throws Exception {
+        final Message message = node.getMessage();
+        final Object retainValue = message.getProperty(RETAIN_PROPERTY);
+        // retain property set to true
+        final boolean retain = retainValue != null && Boolean.parseBoolean(retainValue.toString());
+        if (retain) {
+            if (message.getContent().getLength() > 0) {
+                // non zero length message content
+                retainedMessage = message.copy();
+                retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, true);
+            } else {
+                // clear retained message
+                retainedMessage = null;
+            }
+            // TODO should we remove the publisher's retain property??
+            node.getMessage().removeProperty(RETAIN_PROPERTY);
+        }
+        return wrapped == null ? true : wrapped.add(context, node);
+    }
+
+    public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
+        // Re-dispatch the last retained message seen.
+        if (retainedMessage != null) {
+            sub.addRecoveredMessage(context, retainedMessage);
+        }
+        if (wrapped != null) {
+            wrapped.recover(context, topic, sub);
+        }
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public Message[] browse(ActiveMQDestination destination) throws Exception {
+        List<Message> result = new ArrayList<Message>();
+        if (retainedMessage != null) {
+            DestinationFilter filter = DestinationFilter.parseFilter(destination);
+            if (filter.matches(retainedMessage.getMessage().getDestination())) {
+                result.add(retainedMessage.getMessage());
+            }
+        }
+        return result.toArray(new Message[result.size()]);
+    }
+
+    public SubscriptionRecoveryPolicy copy() {
+        return new RetainedMessageSubscriptionRecoveryPolicy(wrapped);
+    }
+    
+    public void setBroker(Broker broker) {        
+    }
+
+    public void setWrapped(SubscriptionRecoveryPolicy wrapped) {
+        this.wrapped = wrapped;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 0e590f0..ebb9f45 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
 import org.apache.activemq.command.*;
 import org.apache.activemq.store.PersistenceAdapterSupport;
 import org.apache.activemq.util.ByteArrayOutputStream;
@@ -80,13 +81,11 @@ public class MQTTProtocolConverter {
     private long defaultKeepAlive;
     private int activeMQSubscriptionPrefetch=1;
     private final String QOS_PROPERTY_NAME = "QoSPropertyName";
-    private final MQTTRetainedMessages retainedMessages;
     private final MQTTPacketIdGenerator packetIdGenerator;
 
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
         this.mqttTransport = mqttTransport;
         this.brokerService = brokerService;
-        this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService);
         this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
         this.defaultKeepAlive = 0;
     }
@@ -344,36 +343,6 @@ public class MQTTProtocolConverter {
             } catch (IOException e) {
                 LOG.warn("Couldn't send SUBACK for " + command, e);
             }
-            // check retained messages
-            for (int i = 0; i < topics.length; i++) {
-                if (qos[i] == SUBSCRIBE_ERROR) {
-                    // skip this topic if subscribe failed
-                    continue;
-                }
-                final Topic topic = topics[i];
-                ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
-                for (PUBLISH msg : retainedMessages.getMessages(destination)) {
-                    if( msg.payload().length > 0 ) {
-                        try {
-                            PUBLISH retainedCopy = new PUBLISH();
-                            retainedCopy.topicName(msg.topicName());
-                            retainedCopy.retain(msg.retain());
-                            retainedCopy.payload(msg.payload());
-                            // set QoS of retained message to maximum of subscription QoS
-                            retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
-                            switch (retainedCopy.qos()) {
-                                case AT_LEAST_ONCE:
-                                case EXACTLY_ONCE:
-                                    retainedCopy.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
-                                case AT_MOST_ONCE:
-                            }
-                            getMQTTTransport().sendToMQTT(retainedCopy.encode());
-                        } catch (IOException e) {
-                            LOG.warn("Couldn't send retained message " + msg, e);
-                        }
-                    }
-                }
-            }
         } else {
             LOG.warn("No topics defined for Subscription " + command);
         }
@@ -382,28 +351,33 @@ public class MQTTProtocolConverter {
 
     byte onSubscribe(final Topic topic) throws MQTTProtocolException {
 
-        if( mqttSubscriptionByTopic.containsKey(topic.name())) {
-            if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
+        final UTF8Buffer topicName = topic.name();
+        final QoS topicQoS = topic.qos();
+        if( mqttSubscriptionByTopic.containsKey(topicName)) {
+            if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) {
                 // remove old subscription as the QoS has changed
-                onUnSubscribe(topic.name());
+                onUnSubscribe(topicName);
             } else {
-                // duplicate SUBSCRIBE packet, nothing to do
-                return (byte) topic.qos().ordinal();
+                // duplicate SUBSCRIBE packet
+                // TODO find all matching topics and resend retained messages
+                return (byte) topicQoS.ordinal();
             }
+            onUnSubscribe(topicName);
         }
 
-        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
+        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
 
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setDestination(destination);
         consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
+        consumerInfo.setRetroactive(true);
         consumerInfo.setDispatchAsync(true);
         // create durable subscriptions only when cleansession is false
-        if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
-            consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
+        if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
+            consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString());
         }
-        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
+        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
 
         final byte[] qos = {-1};
         sendToActiveMQ(consumerInfo, new ResponseHandler() {
@@ -412,17 +386,17 @@ public class MQTTProtocolConverter {
                 // validate subscription request
                 if (response.isException()) {
                     final Throwable throwable = ((ExceptionResponse) response).getException();
-                    LOG.warn("Error subscribing to " + topic.name(), throwable);
+                    LOG.warn("Error subscribing to " + topicName, throwable);
                     qos[0] = SUBSCRIBE_ERROR;
                 } else {
-                    qos[0] = (byte) topic.qos().ordinal();
+                    qos[0] = (byte) topicQoS.ordinal();
                 }
             }
         });
 
         if (qos[0] != SUBSCRIBE_ERROR) {
             subscriptionsByConsumerId.put(id, mqttSubscription);
-            mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+            mqttSubscriptionByTopic.put(topicName, mqttSubscription);
         }
 
         return qos[0];
@@ -508,9 +482,6 @@ public class MQTTProtocolConverter {
     void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
         checkConnected();
         ActiveMQMessage message = convertMessage(command);
-        if (command.retain()){
-            retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command);
-        }
         message.setProducerId(producerId);
         message.onSend();
         sendToActiveMQ(message, createResponseHandler(command));
@@ -570,6 +541,9 @@ public class MQTTProtocolConverter {
         msg.setPriority((byte) Message.DEFAULT_PRIORITY);
         msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
         msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
+        if (command.retain()) {
+            msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
+        }
 
         ActiveMQTopic topic;
         synchronized (activeMQTopicMap) {
@@ -597,6 +571,9 @@ public class MQTTProtocolConverter {
             qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
         }
         result.qos(qoS);
+        if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
+            result.retain(true);
+        }
 
         UTF8Buffer topicName;
         synchronized (mqttTopicMap) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3143cfc..37016b8 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,16 +16,17 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotEquals;
-
 import java.net.ProtocolException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -34,15 +35,25 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotEquals;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.jaas.GroupPrincipal;
-import org.apache.activemq.security.*;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationEntry;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.SimpleAuthorizationMap;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
@@ -368,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
 
             connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
             Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
-            assertNotNull(msg);
+            assertNotNull("No message for " + topic, msg);
             assertEquals(RETAINED + topic, new String(msg.getPayload()));
             msg.ack();
 
@@ -390,16 +401,17 @@ public class MQTTTest extends AbstractMQTTTest {
 
             connection = mqtt.blockingConnection();
             connection.connect();
-            connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) });
+            final byte[] qos = connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+            assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
 
             // test retained messages
-            Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             do {
                 assertNotNull("RETAINED null " + wildcard, msg);
                 assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
                 assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
                 msg.ack();
-                msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+                msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             } while (msg != null);
 
             // connection is borked after timeout in connection.receive()
@@ -499,10 +511,10 @@ public class MQTTTest extends AbstractMQTTTest {
 
         QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE };
         for (QoS qos : qoss) {
-            connection.subscribe(new Topic[] { new Topic("TopicA", qos) });
+            connection.subscribe(new Topic[]{new Topic("TopicA", qos)});
 
             final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
-            assertNotNull(msg);
+            assertNotNull("No message for " + qos, msg);
             assertEquals(RETAIN, new String(msg.getPayload()));
             msg.ack();
             int waitCount = 0;
@@ -1340,6 +1352,56 @@ public class MQTTTest extends AbstractMQTTTest {
         assertNull("Shouldn't receive the message", msg);
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testActiveMQRecoveryPolicy() throws Exception {
+        addMQTTConnector();
+
+        brokerService.start();
+
+        // test with ActiveMQ LastImageSubscriptionRecoveryPolicy
+        final PolicyMap policyMap = new PolicyMap();
+        final PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
+        policyMap.put(new ActiveMQTopic(">"), policyEntry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        MQTT mqtt = createMQTTConnection("pub-sub", true);
+        final int[] retain = new int[1];
+        final int[] nonretain  = new int[1];
+        mqtt.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                if (frame.messageType() == PUBLISH.TYPE) {
+                    LOG.info("Received message with retain=" + frame.retain());
+                    if (frame.retain()) {
+                        retain[0]++;
+                    } else {
+                        nonretain[0]++;
+                    }
+                }
+            }
+        });
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        final String RETAINED = "RETAINED";
+        connection.publish("one", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+        connection.publish("two", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+
+        final String NONRETAINED = "NONRETAINED";
+        connection.publish("one", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+        connection.publish("two", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+
+        connection.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
+        for (int i = 0; i < 4; i++) {
+            final Message message = connection.receive(30, TimeUnit.SECONDS);
+            assertNotNull("Should receive 4 messages", message);
+            message.ack();
+        }
+        assertEquals("Should receive 2 retained messages", 2, retain[0]);
+        assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
+    }
+
     @Override
     protected String getProtocolScheme() {
         return "mqtt";