You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/02 23:05:00 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5872

Repository: activemq
Updated Branches:
  refs/heads/master 8e7556f39 -> 455f1ca47


https://issues.apache.org/jira/browse/AMQ-5872

The code that adds subscriptions for retroactive subscribers did not
account for the case where the last ack position in the ackLocations
table is used as a placeholder for the next incoming message and there
may not be an element in the messageReference tracker for that index
especially after restart when the index is reloaded.  The code needed to
check this when it iterates of the existing message references to add a
value so to avoid the NPE.  

Also cleaned up the MQTT tests such that they place their data dir in
./target so that old stores get removed on 'mvn clean'

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/455f1ca4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/455f1ca4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/455f1ca4

Branch: refs/heads/master
Commit: 455f1ca475be252b6ffdd07176bf659b81685ea7
Parents: 8e7556f
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 2 17:04:35 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 2 17:04:35 2015 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  25 ++--
 .../transport/mqtt/MQTTMaxFrameSizeTest.java    |   4 +-
 .../mqtt/MQTTOverlapedSubscriptionsTest.java    | 117 +++++++++++++++++++
 .../mqtt/MQTTSubscriptionRecoveryTest.java      |   4 +-
 .../transport/mqtt/MQTTTestSupport.java         |  10 +-
 5 files changed, 144 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/455f1ca4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 0ff203c..ef8fe0a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -248,7 +248,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     private boolean checksumJournalFiles = true;
     protected boolean forceRecoverIndex = false;
     private final Object checkpointThreadLock = new Object();
-    private boolean rewriteOnRedelivery = false;
     private boolean archiveCorruptedIndex = false;
     private boolean useIndexLFRUEviction = false;
     private float indexLFUEvictionFactor = 0.2f;
@@ -1161,7 +1160,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
-    @SuppressWarnings("rawtypes")
     protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
         this.indexLock.writeLock().lock();
         try {
@@ -2153,7 +2151,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 SequenceSet pendingAcks = subscription.getValue();
                 if (pendingAcks != null && !pendingAcks.isEmpty()) {
                     Long lastPendingAck = pendingAcks.getTail().getLast();
-                    for(Long sequenceId : pendingAcks) {
+                    for (Long sequenceId : pendingAcks) {
                         Long current = rc.messageReferences.get(sequenceId);
                         if (current == null) {
                             current = new Long(0);
@@ -2163,6 +2161,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                         // so we need to ensure we don't count that as a message reference on reload.
                         if (!sequenceId.equals(lastPendingAck)) {
                             current = current.longValue() + 1;
+                        } else {
+                            current = Long.valueOf(0L);
                         }
 
                         rc.messageReferences.put(sequenceId, current);
@@ -2235,8 +2235,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         for (Long ackPosition : allOutstanding) {
             Long count = sd.messageReferences.get(ackPosition);
-            count = count.longValue() + 1;
-            sd.messageReferences.put(ackPosition, count);
+
+            // There might not be a reference if the ackLocation was the last
+            // one which is a placeholder for the next incoming message and
+            // no value was added to the message references table.
+            if (count != null) {
+                count = count.longValue() + 1;
+                sd.messageReferences.put(ackPosition, count);
+            }
         }
     }
 
@@ -2259,7 +2265,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
             count = count.longValue() + 1;
             sd.messageReferences.put(messageSequence, count);
-            sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
+            sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
         }
     }
 
@@ -2322,8 +2328,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
                 // Check if the message is reference by any other subscription.
                 Long count = sd.messageReferences.get(messageSequence);
-                if (count != null){
-                long references = count.longValue() - 1;
+                if (count != null) {
+                    long references = count.longValue() - 1;
                     if (references > 0) {
                         sd.messageReferences.put(messageSequence, Long.valueOf(references));
                         return;
@@ -3050,7 +3056,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
         }
 
-
         class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
             Iterator<Entry<Long, MessageKeys>>currentIterator;
             final Iterator<Entry<Long, MessageKeys>>highIterator;
@@ -3145,7 +3150,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             public void remove() {
                 throw new UnsupportedOperationException();
             }
-
         }
     }
 
@@ -3209,5 +3213,4 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     public void setPreallocationStrategy(String preallocationStrategy) {
         this.preallocationStrategy = preallocationStrategy;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/455f1ca4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
index 029de93..e5282b3 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTMaxFrameSizeTest.java
@@ -68,7 +68,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
         LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(), maxFrameSize);
 
         MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId(getName());
+        mqtt.setClientId(getTestName());
         mqtt.setKeepAlive((short) 10);
         mqtt.setVersion("3.1.1");
 
@@ -97,7 +97,7 @@ public class MQTTMaxFrameSizeTest extends MQTTTestSupport {
         LOG.debug("Starting test on connector {} for frame size: {}", getProtocolScheme(), maxFrameSize);
 
         MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId(getName());
+        mqtt.setClientId(getTestName());
         mqtt.setKeepAlive((short) 10);
         mqtt.setVersion("3.1.1");
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/455f1ca4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java
new file mode 100644
index 0000000..33fb61b
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTOverlapedSubscriptionsTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MQTTOverlapedSubscriptionsTest {
+
+    private BrokerService brokerService;
+    private String mqttClientUrl;
+
+    @Before
+    public void setup() throws Exception {
+        initializeBroker(true);
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    protected void initializeBroker(boolean deleteAllMessagesOnStart) throws Exception {
+
+        brokerService = new BrokerService();
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStart);
+        TransportConnector connector = new TransportConnector();
+        connector.setUri(new URI("mqtt://localhost:0"));
+        connector.setName("mqtt");
+        brokerService.addConnector(connector);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        mqttClientUrl = connector.getPublishableConnectString().replace("mqtt", "tcp");
+    }
+
+    @Test
+    public void testMqttResubscribe() throws Exception {
+        // inactive durable consumer on test/1 will be left on the broker after restart
+        doTest("test/1");
+
+        shutdown();
+        initializeBroker(false);
+
+        // new consumer on test/# will match all messages sent to the inactive sub
+        doTest("test/#");
+    }
+
+    private BlockingConnection getConnection(String host, String clientId) throws URISyntaxException, Exception {
+        BlockingConnection conn;
+        MQTT mqttPub = new MQTT();
+        mqttPub.setHost(host);
+        mqttPub.setConnectAttemptsMax(0);
+        mqttPub.setReconnectAttemptsMax(0);
+        mqttPub.setClientId(clientId);
+        mqttPub.setCleanSession(false);
+        conn = mqttPub.blockingConnection();
+        conn.connect();
+        return conn;
+    }
+
+    public void doTest(String subscribe) throws Exception {
+        String payload = "This is test payload";
+        BlockingConnection connectionPub = getConnection(mqttClientUrl, "client1");
+        BlockingConnection connectionSub = getConnection(mqttClientUrl, "client2");
+        Topic[] topics = { new Topic(subscribe, QoS.values()[1]) };
+        connectionSub.subscribe(topics);
+        connectionPub.publish("test/1", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+        receive(connectionSub, 3000);
+
+        //Unsubscribe and resubscribe
+        connectionSub.unsubscribe(new String[]{subscribe});
+        connectionSub.subscribe(topics);
+        connectionPub.publish(subscribe, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+        receive(connectionSub, 3000);
+
+        connectionPub.disconnect();
+        connectionSub.disconnect();
+    }
+
+    public byte[] receive(BlockingConnection connection, int timeout) throws Exception {
+        byte[] result = null;
+        org.fusesource.mqtt.client.Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
+        if (message != null) {
+            result = message.getPayload();
+            message.ack();
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/455f1ca4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
index 61f74cc..0b7f958 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSubscriptionRecoveryTest.java
@@ -72,7 +72,7 @@ public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {
     @Test
     public void testDurableSubscriptionsAreRecovered() throws Exception {
 
-        MqttClient connection = createClient(getName());
+        MqttClient connection = createClient(getTestName());
 
         final String[] topics = { "TopicA/", "TopicB/", "TopicC/" };
         for (int i = 0; i < topics.length; i++) {
@@ -90,7 +90,7 @@ public class MQTTSubscriptionRecoveryTest extends MQTTTestSupport {
 
         assertStatsForDisconnectedClient(topics.length);
 
-        connection = createClient(getName());
+        connection = createClient(getTestName());
 
         assertStatsForConnectedClient(topics.length);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/455f1ca4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
index 18aee54..0b58687 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
@@ -44,6 +44,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Tracer;
@@ -59,6 +60,8 @@ public class MQTTTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
 
+    public static final String KAHADB_DIRECTORY = "target/activemq-data/";
+
     protected BrokerService brokerService;
     protected int port;
     protected String jmsUri = "vm://localhost";
@@ -90,7 +93,7 @@ public class MQTTTestSupport {
         this.useSSL = useSSL;
     }
 
-    public String getName() {
+    public String getTestName() {
         return name.getMethodName();
     }
 
@@ -144,6 +147,11 @@ public class MQTTTestSupport {
         BrokerService brokerService = new BrokerService();
         brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
         brokerService.setPersistent(isPersistent());
+        if (isPersistent()) {
+            KahaDBStore kaha = new KahaDBStore();
+            kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
+            brokerService.setPersistenceAdapter(kaha);
+        }
         brokerService.setAdvisorySupport(false);
         brokerService.setUseJmx(true);
         brokerService.getManagementContext().setCreateConnector(false);