You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2014/10/22 00:01:35 UTC

[jira] [Closed] (AMQ-5390) MQTT pending durable subscriber messages are not delievered after broker restart

     [ https://issues.apache.org/jira/browse/AMQ-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Bish closed AMQ-5390.
-----------------------------
    Resolution: Not a Problem

Added new test to the test suite to show that things are working as expected, messages are recovered after broker restart. 

> MQTT pending durable subscriber messages are not delievered after broker restart
> --------------------------------------------------------------------------------
>
>                 Key: AMQ-5390
>                 URL: https://issues.apache.org/jira/browse/AMQ-5390
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 5.11.0
>            Reporter: AR
>
> If there are pending messages to be delivered to a subscriber and if the broker is restarted at this point, the pending messages are not delivered to the subscriber when it connects after broker restart.
> I modified existing test case testReceiveMessageSentWhileOffline() and added test case testReceiveMessageSentWhileOfflineAndBrokerRestart() shown below:
> changes:
> * use standalone broker as I was not sure if embedded broker persists messages on permanent store.
> * manually need to restart when test prompts to restart broker
> {noformat}
> @Test(timeout = 60 * 1000)
>     public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
>         final byte[] payload = new byte[1024 * 32];
>         for (int i = 0; i < payload.length; i++) {
>             payload[i] = '2';
>         }
>         int numberOfRuns = 100;
>         int messagesPerRun = 2;
>         final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
>         final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
>         mqttPub.setHost("tcp://localhost:1883");
>         mqttSub.setHost("tcp://localhost:1883");
>         final BlockingConnection connectionPub = mqttPub.blockingConnection();
>         connectionPub.connect();
>         BlockingConnection connectionSub = mqttSub.blockingConnection();
>         connectionSub.connect();
>         Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
>         connectionSub.subscribe(topics);
>         for (int i = 0; i < messagesPerRun; ++i) {
>             connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
>         }
>         int received = 0;
>         for (int i = 0; i < messagesPerRun; ++i) {
>             Message message = connectionSub.receive(5, TimeUnit.SECONDS);
>             assertNotNull(message);
>             received++;
>             assertTrue(Arrays.equals(payload, message.getPayload()));
>             message.ack();
>         }
>         connectionSub.disconnect();
>         for (int j = 0; j < numberOfRuns; j++) {
>             for (int i = 0; i < messagesPerRun; ++i) {
>                 connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
>             }
>             
>             System.out.println("Restart broker here.....");
>             Thread.sleep(30000);
>             
>             connectionSub = mqttSub.blockingConnection();
>             connectionSub.connect();
>             connectionSub.subscribe(topics);
>             for (int i = 0; i < messagesPerRun; ++i) {
>                 Message message = connectionSub.receive(5, TimeUnit.SECONDS);
>                 assertNotNull(message);
>                 received++;
>                 assertTrue(Arrays.equals(payload, message.getPayload()));
>                 message.ack();
>             }
>             connectionSub.disconnect();
>         }
>         assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
>     }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)