You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/10/22 00:01:35 UTC
[jira] [Closed] (AMQ-5390) MQTT pending durable subscriber messages
are not delievered after broker restart
[ https://issues.apache.org/jira/browse/AMQ-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Bish closed AMQ-5390.
-----------------------------
Resolution: Not a Problem
Added new test to the test suite to show that things are working as expected, messages are recovered after broker restart.
> MQTT pending durable subscriber messages are not delievered after broker restart
> --------------------------------------------------------------------------------
>
> Key: AMQ-5390
> URL: https://issues.apache.org/jira/browse/AMQ-5390
> Project: ActiveMQ
> Issue Type: Bug
> Components: MQTT
> Affects Versions: 5.11.0
> Reporter: AR
>
> If there are pending messages to be delivered to a subscriber and if the broker is restarted at this point, the pending messages are not delivered to the subscriber when it connects after broker restart.
> I modified existing test case testReceiveMessageSentWhileOffline() and added test case testReceiveMessageSentWhileOfflineAndBrokerRestart() shown below:
> changes:
> * use standalone broker as I was not sure if embedded broker persists messages on permanent store.
> * manually need to restart when test prompts to restart broker
> {noformat}
> @Test(timeout = 60 * 1000)
> public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
> final byte[] payload = new byte[1024 * 32];
> for (int i = 0; i < payload.length; i++) {
> payload[i] = '2';
> }
> int numberOfRuns = 100;
> int messagesPerRun = 2;
> final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
> final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
> mqttPub.setHost("tcp://localhost:1883");
> mqttSub.setHost("tcp://localhost:1883");
> final BlockingConnection connectionPub = mqttPub.blockingConnection();
> connectionPub.connect();
> BlockingConnection connectionSub = mqttSub.blockingConnection();
> connectionSub.connect();
> Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
> connectionSub.subscribe(topics);
> for (int i = 0; i < messagesPerRun; ++i) {
> connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
> }
> int received = 0;
> for (int i = 0; i < messagesPerRun; ++i) {
> Message message = connectionSub.receive(5, TimeUnit.SECONDS);
> assertNotNull(message);
> received++;
> assertTrue(Arrays.equals(payload, message.getPayload()));
> message.ack();
> }
> connectionSub.disconnect();
> for (int j = 0; j < numberOfRuns; j++) {
> for (int i = 0; i < messagesPerRun; ++i) {
> connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
> }
>
> System.out.println("Restart broker here.....");
> Thread.sleep(30000);
>
> connectionSub = mqttSub.blockingConnection();
> connectionSub.connect();
> connectionSub.subscribe(topics);
> for (int i = 0; i < messagesPerRun; ++i) {
> Message message = connectionSub.receive(5, TimeUnit.SECONDS);
> assertNotNull(message);
> received++;
> assertTrue(Arrays.equals(payload, message.getPayload()));
> message.ack();
> }
> connectionSub.disconnect();
> }
> assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)