You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/01/07 15:32:21 UTC

svn commit: r1429809 - in /activemq/trunk/activemq-mqtt/src: main/java/org/apache/activemq/transport/mqtt/ test/java/org/apache/activemq/transport/mqtt/

Author: rajdavies
Date: Mon Jan  7 14:32:20 2013
New Revision: 1429809

URL: http://svn.apache.org/viewvc?rev=1429809&view=rev
Log:
Fix instability in test cases

Modified:
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1429809&r1=1429808&r2=1429809&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Mon Jan  7 14:32:20 2013
@@ -85,6 +85,7 @@ class MQTTProtocolConverter {
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
     private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 1.5;
+    private static final int DEFAULT_CACHE_SIZE = 5000;
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
@@ -95,10 +96,10 @@ class MQTTProtocolConverter {
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
     private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
     private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
-    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>();
-    private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>();
-    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>();
-    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>();
+    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
+    private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
+    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
+    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
     private final MQTTTransport mqttTransport;
 
     private final Object commnadIdMutex = new Object();
@@ -108,6 +109,7 @@ class MQTTProtocolConverter {
     private CONNECT connect;
     private String clientId;
     private long defaultKeepAlive;
+    private int activeMQSubscriptionPrefetch=1;
     private final String QOS_PROPERTY_NAME = "QoSPropertyName";
 
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
@@ -125,7 +127,7 @@ class MQTTProtocolConverter {
         command.setCommandId(generateCommandId());
         if (handler != null) {
             command.setResponseRequired(true);
-            resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
+            resposeHandlers.put(command.getCommandId(), handler);
         }
         mqttTransport.sendToActiveMQ(command);
     }
@@ -297,7 +299,7 @@ class MQTTProtocolConverter {
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setDestination(destination);
-        consumerInfo.setPrefetchSize(1000);
+        consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
         consumerInfo.setDispatchAsync(true);
         if (!connect.cleanSession() && (connect.clientId() != null)) {
             //by default subscribers are persistent
@@ -316,8 +318,8 @@ class MQTTProtocolConverter {
     void onUnSubscribe(UNSUBSCRIBE command) {
         UTF8Buffer[] topics = command.topics();
         if (topics != null) {
-            for (int i = 0; i < topics.length; i++) {
-                onUnSubscribe(topics[i]);
+            for (UTF8Buffer topic : topics) {
+                onUnSubscribe(topic);
             }
         }
         UNSUBACK ack = new UNSUBACK();
@@ -332,7 +334,10 @@ class MQTTProtocolConverter {
             if (info != null) {
                 subscriptionsByConsumerId.remove(info.getConsumerId());
             }
-            RemoveInfo removeInfo = info.createRemoveCommand();
+            RemoveInfo removeInfo = null;
+            if (info != null) {
+                removeInfo = info.createRemoveCommand();
+            }
             sendToActiveMQ(removeInfo, null);
         }
     }
@@ -441,7 +446,7 @@ class MQTTProtocolConverter {
         msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE);
         msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
 
-        ActiveMQTopic topic = null;
+        ActiveMQTopic topic;
         synchronized (activeMQTopicMap) {
             topic = activeMQTopicMap.get(command.topicName());
             if (topic == null) {
@@ -679,9 +684,23 @@ class MQTTProtocolConverter {
      * Set the default keep alive time (in milliseconds) that would be used if configured on server side
      * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
      *
-     * @param defaultKeepAlive
+     * @param defaultKeepAlive the keepAlive in milliseconds
      */
     public void setDefaultKeepAlive(long defaultKeepAlive) {
         this.defaultKeepAlive = defaultKeepAlive;
     }
+
+    public int getActiveMQSubscriptionPrefetch() {
+        return activeMQSubscriptionPrefetch;
+    }
+
+    /**
+     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
+     * The default = 1
+     * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
+     */
+
+    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
+        this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
+    }
 }

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java?rev=1429809&r1=1429808&r2=1429809&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java Mon Jan  7 14:32:20 2013
@@ -153,5 +153,19 @@ public class MQTTTransportFilter extends
         protocolConverter.setDefaultKeepAlive(defaultHeartBeat);
     }
 
+    public int getActiveMQSubscriptionPrefetch() {
+        return protocolConverter.getActiveMQSubscriptionPrefetch();
+    }
+
+    /**
+     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
+     * The default = 1
+     * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
+     */
+
+    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
+        protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
+    }
+
 
 }

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1429809&r1=1429808&r2=1429809&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Mon Jan  7 14:32:20 2013
@@ -78,7 +78,7 @@ public class MQTTTest {
         brokerService.setPersistent(false);
         brokerService.setAdvisorySupport(false);
         brokerService.setUseJmx(false);
-        this.numberOfMessages = 1000;
+        this.numberOfMessages = 3000;
     }
 
     @After
@@ -200,6 +200,7 @@ public class MQTTTest {
             assertNotNull("Should get a message", message);
             LOG.debug(payload);
             message.ack();
+            //System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
             assertEquals(payload, new String(message.getPayload()));
         }
         subConnection.disconnect();