You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "AR (JIRA)" <ji...@apache.org> on 2014/09/15 23:08:34 UTC

[jira] [Created] (AMQ-5358) MQTT Durable subscription broken in 5.10 and 5.11

AR created AMQ-5358:
-----------------------

             Summary: MQTT Durable subscription broken in 5.10 and 5.11
                 Key: AMQ-5358
                 URL: https://issues.apache.org/jira/browse/AMQ-5358
             Project: ActiveMQ
          Issue Type: Bug
          Components: MQTT
    Affects Versions: 5.10.0, 5.11.0
         Environment: Mac OS X, MQTT Paho client library version 0.4.0
            Reporter: AR


Durable subscriptions do not work in 5.10 and 5.11-SNAPSHOT.
Test case:
Run default broker.

. Connect "client1" with clean_session=true.
. Subscribe to topic "paho/test"
. Disconnect client1
. Connect "client1" with clean_session=false
. Subscribe to topic "paho/test"
. Disconnect "client1"

. Connect "client2"
. Publish message "hello world" to topic "paho/test"
. Disconnect "client2"

. Connect "client1" with clean_session=false
. Subscribe to topic "paho/test"
. Verify that the message is received.


Here is the Junit test code:
----------------------------------
package com.mytests.activemqtests;

import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;

import junit.framework.TestCase;


public class MqttDurableSubTest extends TestCase implements MqttCallback {

	final static String BROKER1_URL = "tcp://localhost:1883";
	
	final static int MQTT_QOS_ATMOSTONCE = 0;
	final static int MQTT_QOS_ATLEASTONCE = 1;
	final static int MQTT_QOS_EXACTLYONCE = 2;
	
	private boolean mMessageReceived = false;
	private CountDownLatch mLatch;

	public MqttDurableSubTest(String name) {
		super(name);
	}

	protected void setUp() throws Exception {
		super.setUp();
	}

	protected void tearDown() throws Exception {
		super.tearDown();
	}

	@Test
	public void testDuplicateMsg() throws Exception {

		MqttConnectOptions client1ConnOpt = new MqttConnectOptions();
		MqttClient client1 = new MqttClient(BROKER1_URL, "client1");
		
		client1ConnOpt.setCleanSession(true);
		client1.connect();
		System.out.println("Client1 Connected to " + BROKER1_URL);
		client1.subscribe("paho/test", MQTT_QOS_ATMOSTONCE);
		Thread.sleep(2000);
		client1.disconnect();
		System.out.println("Client1 completed cleansession=1 subscription.");
		
		
		client1ConnOpt.setCleanSession(false);
		client1.setCallback(this);
		client1.connect(client1ConnOpt);

		System.out.println("Client1 Connected to " + BROKER1_URL);
		client1.subscribe("paho/test", MQTT_QOS_ATMOSTONCE);
		Thread.sleep(2000);
		client1.disconnect();
		System.out.println("Client1 completed durable subscription.");
		
		MqttConnectOptions client2ConnOpt = new MqttConnectOptions();
		client1ConnOpt.setCleanSession(true);
		MqttClient client2 = new MqttClient(BROKER1_URL, "client2");
		client2.setCallback(this);
		client2.connect(client2ConnOpt);
		client2.publish("paho/test", "hello world".getBytes(), MQTT_QOS_ATMOSTONCE, false);
		System.out.println("Client2 published");
		client2.disconnect();
		
	

		client1ConnOpt.setCleanSession(false);
		client1.setCallback(this);
		client1.connect(client1ConnOpt);

		System.out.println("Client1 Connected to " + BROKER1_URL);
		client1.subscribe("paho/test", MQTT_QOS_ATMOSTONCE);
		
		mLatch = new CountDownLatch(1);
		mLatch.await(10, TimeUnit.SECONDS);
		assertEquals("Success!", true, mMessageReceived);
		System.out.println("Done");
	}

	@Override
	public void connectionLost(Throwable arg0) {
		// TODO Auto-generated method stub

	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken arg0) {
		// TODO Auto-generated method stub

	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {

		System.out.println("MQTT - messageArrived "+topic+" , Message: ["+message+"] , ReceviedTime: ["+ new Date() +"] , QoS: ["+message.getQos()+"] , isDup: ["+message.isDuplicate()+"]" );
        if (message.toString().equals("hello world")) {
        	mMessageReceived = true;
        	mLatch.countDown();
        }
	}
}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)