You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/01/07 15:32:21 UTC
svn commit: r1429809 - in /activemq/trunk/activemq-mqtt/src:
main/java/org/apache/activemq/transport/mqtt/
test/java/org/apache/activemq/transport/mqtt/
Author: rajdavies
Date: Mon Jan 7 14:32:20 2013
New Revision: 1429809
URL: http://svn.apache.org/viewvc?rev=1429809&view=rev
Log:
Fix instability in test cases
Modified:
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1429809&r1=1429808&r2=1429809&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Mon Jan 7 14:32:20 2013
@@ -85,6 +85,7 @@ class MQTTProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
+ private static final int DEFAULT_CACHE_SIZE = 5000;
private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
private final SessionId sessionId = new SessionId(connectionId, -1);
@@ -95,10 +96,10 @@ class MQTTProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
- private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
- private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
- private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
- private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
+ private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
+ private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
+ private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
+ private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
private final MQTTTransport mqttTransport;
private final Object commnadIdMutex = new Object();
@@ -108,6 +109,7 @@ class MQTTProtocolConverter {
private CONNECT connect;
private String clientId;
private long defaultKeepAlive;
+ private int activeMQSubscriptionPrefetch=1;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
@@ -125,7 +127,7 @@ class MQTTProtocolConverter {
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
- resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+ resposeHandlers.put(command.getCommandId(), handler);
}
mqttTransport.sendToActiveMQ(command);
}
@@ -297,7 +299,7 @@ class MQTTProtocolConverter {
ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setDestination(destination);
- consumerInfo.setPrefetchSize(1000);
+ consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
consumerInfo.setDispatchAsync(true);
if (!connect.cleanSession() && (connect.clientId() != null)) {
//by default subscribers are persistent
@@ -316,8 +318,8 @@ class MQTTProtocolConverter {
void onUnSubscribe(UNSUBSCRIBE command) {
UTF8Buffer[] topics = command.topics();
if (topics != null) {
- for (int i = 0; i < topics.length; i++) {
- onUnSubscribe(topics[i]);
+ for (UTF8Buffer topic : topics) {
+ onUnSubscribe(topic);
}
}
UNSUBACK ack = new UNSUBACK();
@@ -332,7 +334,10 @@ class MQTTProtocolConverter {
if (info != null) {
subscriptionsByConsumerId.remove(info.getConsumerId());
}
- RemoveInfo removeInfo = info.createRemoveCommand();
+ RemoveInfo removeInfo = null;
+ if (info != null) {
+ removeInfo = info.createRemoveCommand();
+ }
sendToActiveMQ(removeInfo, null);
}
}
@@ -441,7 +446,7 @@ class MQTTProtocolConverter {
msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
- ActiveMQTopic topic = null;
+ ActiveMQTopic topic;
synchronized (activeMQTopicMap) {
topic = activeMQTopicMap.get(command.topicName());
if (topic == null) {
@@ -679,9 +684,23 @@ class MQTTProtocolConverter {
* Set the default keep alive time (in milliseconds) that would be used if configured on server side
* and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
*
- * @param defaultKeepAlive
+ * @param defaultKeepAlive the keepAlive in milliseconds
*/
public void setDefaultKeepAlive(long defaultKeepAlive) {
this.defaultKeepAlive = defaultKeepAlive;
}
+
+ public int getActiveMQSubscriptionPrefetch() {
+ return activeMQSubscriptionPrefetch;
+ }
+
+ /**
+ * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
+ * The default = 1
+ * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
+ */
+
+ public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
+ this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
+ }
}
Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1429809&r1=1429808&r2=1429809&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Mon Jan 7 14:32:20 2013
@@ -153,5 +153,19 @@ public class MQTTTransportFilter extends
protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
}
+ public int getActiveMQSubscriptionPrefetch() {
+ return protocolConverter.getActiveMQSubscriptionPrefetch();
+ }
+
+ /**
+ * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
+ * The default = 1
+ * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
+ */
+
+ public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
+ protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
+ }
+
}
Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1429809&r1=1429808&r2=1429809&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Mon Jan 7 14:32:20 2013
@@ -78,7 +78,7 @@ public class MQTTTest {
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
- this.numberOfMessages = 1000;
+ this.numberOfMessages = 3000;
}
@After
@@ -200,6 +200,7 @@ public class MQTTTest {
assertNotNull("Should get a message", message);
LOG.debug(payload);
message.ack();
+ //System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
assertEquals(payload, new String(message.getPayload()));
}
subConnection.disconnect();