You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:30 UTC
[03/13] git commit: Partial fix for AMQ-5160,
attempts to resolve retained messages using subscription recovery
policy, but fails to resend retained messages for duplicate subscriptions
Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bcb60a48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bcb60a48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bcb60a48
Branch: refs/heads/trunk
Commit: bcb60a482cdc1cd2d2de9b4dba6a38ef831b2fa4
Parents: ba519d8
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Wed May 7 19:05:36 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:18 2014 +0200
----------------------------------------------------------------------
.../apache/activemq/broker/region/Topic.java | 14 ++-
...tainedMessageSubscriptionRecoveryPolicy.java | 107 +++++++++++++++++++
.../transport/mqtt/MQTTProtocolConverter.java | 71 +++++-------
.../activemq/transport/mqtt/MQTTTest.java | 86 ++++++++++++---
4 files changed, 215 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 277ce05..4744af8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -33,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
-import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
@@ -91,7 +91,7 @@ public class Topic extends BaseDestination implements Task {
subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
setAlwaysRetroactive(true);
} else {
- subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+ subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null);
}
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
@@ -675,8 +675,14 @@ public class Topic extends BaseDestination implements Task {
return subscriptionRecoveryPolicy;
}
- public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
- this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
+ public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) {
+ if (this.subscriptionRecoveryPolicy != null && this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) {
+ // allow users to combine retained message policy with other ActiveMQ policies
+ RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
+ policy.setWrapped(recoveryPolicy);
+ } else {
+ this.subscriptionRecoveryPolicy = recoveryPolicy;
+ }
}
// Implementation methods
http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
new file mode 100644
index 0000000..d350a5f
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.SubscriptionRecovery;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
+
+/**
+ * This implementation of {@link org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy} will only keep the
+ * last non-zero length message with the {@link org.apache.activemq.command.ActiveMQMessage}.RETAIN_PROPERTY.
+ *
+ * @org.apache.xbean.XBean
+ *
+ */
+public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
+
+ public static final String RETAIN_PROPERTY = "ActiveMQRetain";
+ public static final String RETAINED_PROPERTY = "ActiveMQRetained";
+ private volatile MessageReference retainedMessage;
+ private SubscriptionRecoveryPolicy wrapped;
+
+ public RetainedMessageSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public boolean add(ConnectionContext context, MessageReference node) throws Exception {
+ final Message message = node.getMessage();
+ final Object retainValue = message.getProperty(RETAIN_PROPERTY);
+ // retain property set to true
+ final boolean retain = retainValue != null && Boolean.parseBoolean(retainValue.toString());
+ if (retain) {
+ if (message.getContent().getLength() > 0) {
+ // non zero length message content
+ retainedMessage = message.copy();
+ retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, true);
+ } else {
+ // clear retained message
+ retainedMessage = null;
+ }
+ // TODO should we remove the publisher's retain property??
+ node.getMessage().removeProperty(RETAIN_PROPERTY);
+ }
+ return wrapped == null ? true : wrapped.add(context, node);
+ }
+
+ public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
+ // Re-dispatch the last retained message seen.
+ if (retainedMessage != null) {
+ sub.addRecoveredMessage(context, retainedMessage);
+ }
+ if (wrapped != null) {
+ wrapped.recover(context, topic, sub);
+ }
+ }
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public Message[] browse(ActiveMQDestination destination) throws Exception {
+ List<Message> result = new ArrayList<Message>();
+ if (retainedMessage != null) {
+ DestinationFilter filter = DestinationFilter.parseFilter(destination);
+ if (filter.matches(retainedMessage.getMessage().getDestination())) {
+ result.add(retainedMessage.getMessage());
+ }
+ }
+ return result.toArray(new Message[result.size()]);
+ }
+
+ public SubscriptionRecoveryPolicy copy() {
+ return new RetainedMessageSubscriptionRecoveryPolicy(wrapped);
+ }
+
+ public void setBroker(Broker broker) {
+ }
+
+ public void setWrapped(SubscriptionRecoveryPolicy wrapped) {
+ this.wrapped = wrapped;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 0e590f0..ebb9f45 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -28,6 +28,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport;
import org.apache.activemq.util.ByteArrayOutputStream;
@@ -80,13 +81,11 @@ public class MQTTProtocolConverter {
private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch=1;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
- private final MQTTRetainedMessages retainedMessages;
private final MQTTPacketIdGenerator packetIdGenerator;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
this.brokerService = brokerService;
- this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService);
this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
this.defaultKeepAlive = 0;
}
@@ -344,36 +343,6 @@ public class MQTTProtocolConverter {
} catch (IOException e) {
LOG.warn("Couldn't send SUBACK for " + command, e);
}
- // check retained messages
- for (int i = 0; i < topics.length; i++) {
- if (qos[i] == SUBSCRIBE_ERROR) {
- // skip this topic if subscribe failed
- continue;
- }
- final Topic topic = topics[i];
- ActiveMQTopic destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
- for (PUBLISH msg : retainedMessages.getMessages(destination)) {
- if( msg.payload().length > 0 ) {
- try {
- PUBLISH retainedCopy = new PUBLISH();
- retainedCopy.topicName(msg.topicName());
- retainedCopy.retain(msg.retain());
- retainedCopy.payload(msg.payload());
- // set QoS of retained message to maximum of subscription QoS
- retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
- switch (retainedCopy.qos()) {
- case AT_LEAST_ONCE:
- case EXACTLY_ONCE:
- retainedCopy.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
- case AT_MOST_ONCE:
- }
- getMQTTTransport().sendToMQTT(retainedCopy.encode());
- } catch (IOException e) {
- LOG.warn("Couldn't send retained message " + msg, e);
- }
- }
- }
- }
} else {
LOG.warn("No topics defined for Subscription " + command);
}
@@ -382,28 +351,33 @@ public class MQTTProtocolConverter {
byte onSubscribe(final Topic topic) throws MQTTProtocolException {
- if( mqttSubscriptionByTopic.containsKey(topic.name())) {
- if (topic.qos() != mqttSubscriptionByTopic.get(topic.name()).qos()) {
+ final UTF8Buffer topicName = topic.name();
+ final QoS topicQoS = topic.qos();
+ if( mqttSubscriptionByTopic.containsKey(topicName)) {
+ if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) {
// remove old subscription as the QoS has changed
- onUnSubscribe(topic.name());
+ onUnSubscribe(topicName);
} else {
- // duplicate SUBSCRIBE packet, nothing to do
- return (byte) topic.qos().ordinal();
+ // duplicate SUBSCRIBE packet
+ // TODO find all matching topics and resend retained messages
+ return (byte) topicQoS.ordinal();
}
+ onUnSubscribe(topicName);
}
- ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
+ ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
+ consumerInfo.setRetroactive(true);
consumerInfo.setDispatchAsync(true);
// create durable subscriptions only when cleansession is false
- if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
- consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
+ if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) {
+ consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString());
}
- MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
+ MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
final byte[] qos = {-1};
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@@ -412,17 +386,17 @@ public class MQTTProtocolConverter {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
- LOG.warn("Error subscribing to " + topic.name(), throwable);
+ LOG.warn("Error subscribing to " + topicName, throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
- qos[0] = (byte) topic.qos().ordinal();
+ qos[0] = (byte) topicQoS.ordinal();
}
}
});
if (qos[0] != SUBSCRIBE_ERROR) {
subscriptionsByConsumerId.put(id, mqttSubscription);
- mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+ mqttSubscriptionByTopic.put(topicName, mqttSubscription);
}
return qos[0];
@@ -508,9 +482,6 @@ public class MQTTProtocolConverter {
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
ActiveMQMessage message = convertMessage(command);
- if (command.retain()){
- retainedMessages.addMessage((ActiveMQTopic) message.getDestination(), command);
- }
message.setProducerId(producerId);
message.onSend();
sendToActiveMQ(message, createResponseHandler(command));
@@ -570,6 +541,9 @@ public class MQTTProtocolConverter {
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
+ if (command.retain()) {
+ msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
+ }
ActiveMQTopic topic;
synchronized (activeMQTopicMap) {
@@ -597,6 +571,9 @@ public class MQTTProtocolConverter {
qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
}
result.qos(qoS);
+ if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
+ result.retain(true);
+ }
UTF8Buffer topicName;
synchronized (mqttTopicMap) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3143cfc..37016b8 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,16 +16,17 @@
*/
package org.apache.activemq.transport.mqtt;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotEquals;
-
import java.net.ProtocolException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -34,15 +35,25 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.jaas.GroupPrincipal;
-import org.apache.activemq.security.*;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationEntry;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
@@ -368,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
- assertNotNull(msg);
+ assertNotNull("No message for " + topic, msg);
assertEquals(RETAINED + topic, new String(msg.getPayload()));
msg.ack();
@@ -390,16 +401,17 @@ public class MQTTTest extends AbstractMQTTTest {
connection = mqtt.blockingConnection();
connection.connect();
- connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) });
+ final byte[] qos = connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+ assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
// test retained messages
- Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
msg.ack();
- msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
} while (msg != null);
// connection is borked after timeout in connection.receive()
@@ -499,10 +511,10 @@ public class MQTTTest extends AbstractMQTTTest {
QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE };
for (QoS qos : qoss) {
- connection.subscribe(new Topic[] { new Topic("TopicA", qos) });
+ connection.subscribe(new Topic[]{new Topic("TopicA", qos)});
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
- assertNotNull(msg);
+ assertNotNull("No message for " + qos, msg);
assertEquals(RETAIN, new String(msg.getPayload()));
msg.ack();
int waitCount = 0;
@@ -1340,6 +1352,56 @@ public class MQTTTest extends AbstractMQTTTest {
assertNull("Shouldn't receive the message", msg);
}
+ @Test(timeout = 60 * 1000)
+ public void testActiveMQRecoveryPolicy() throws Exception {
+ addMQTTConnector();
+
+ brokerService.start();
+
+ // test with ActiveMQ LastImageSubscriptionRecoveryPolicy
+ final PolicyMap policyMap = new PolicyMap();
+ final PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
+ policyMap.put(new ActiveMQTopic(">"), policyEntry);
+ brokerService.setDestinationPolicy(policyMap);
+
+ MQTT mqtt = createMQTTConnection("pub-sub", true);
+ final int[] retain = new int[1];
+ final int[] nonretain = new int[1];
+ mqtt.setTracer(new Tracer() {
+ @Override
+ public void onReceive(MQTTFrame frame) {
+ if (frame.messageType() == PUBLISH.TYPE) {
+ LOG.info("Received message with retain=" + frame.retain());
+ if (frame.retain()) {
+ retain[0]++;
+ } else {
+ nonretain[0]++;
+ }
+ }
+ }
+ });
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+ final String RETAINED = "RETAINED";
+ connection.publish("one", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+ connection.publish("two", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+
+ final String NONRETAINED = "NONRETAINED";
+ connection.publish("one", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+ connection.publish("two", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+
+ connection.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
+ for (int i = 0; i < 4; i++) {
+ final Message message = connection.receive(30, TimeUnit.SECONDS);
+ assertNotNull("Should receive 4 messages", message);
+ message.ack();
+ }
+ assertEquals("Should receive 2 retained messages", 2, retain[0]);
+ assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
+ }
+
@Override
protected String getProtocolScheme() {
return "mqtt";