You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/10 14:36:18 UTC
git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4927
Updated Branches:
refs/heads/trunk fe36820b8 -> 6683eb652
Fix for https://issues.apache.org/jira/browse/AMQ-4927
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6683eb65
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6683eb65
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6683eb65
Branch: refs/heads/trunk
Commit: 6683eb652f0f4efe6130bb72410ae36e93b6868a
Parents: fe36820
Author: rajdavies <ra...@gmail.com>
Authored: Tue Dec 10 13:35:39 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Tue Dec 10 13:36:09 2013 +0000
----------------------------------------------------------------------
.../activemq/broker/jmx/ProducerView.java | 4 +-
.../activemq/broker/jmx/ProducerViewMBean.java | 2 +-
.../activemq/broker/jmx/SubscriptionView.java | 4 +-
.../activemq/broker/region/AbstractRegion.java | 3 +-
.../broker/region/AbstractSubscription.java | 19 +++--
.../activemq/broker/region/Subscription.java | 14 ++--
.../apache/activemq/command/ProducerInfo.java | 17 ++++-
.../transport/mqtt/MQTTProtocolConverter.java | 41 +++++-----
.../transport/mqtt/MQTTRetainedMessages.java | 80 ++++++++++++++++++++
.../transport/mqtt/FuseMQQTTClientProvider.java | 7 +-
.../transport/mqtt/MQTTClientProvider.java | 1 +
.../activemq/transport/mqtt/MQTTTest.java | 51 ++++++++++++-
.../region/QueueDuplicatesFromStoreTest.java | 12 ++-
13 files changed, 208 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
index 6905c72..e211b75 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
@@ -188,12 +188,12 @@ public class ProducerView implements ProducerViewMBean {
@Override
public void resetStatistics() {
if (info != null){
- info.getSentCount().reset();
+ info.resetSentCount();
}
}
@Override
public long getSentCount() {
- return info != null ? info.getSentCount().getCount() :0;
+ return info != null ? info.getSentCount() :0;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
index 14c2073..4776283 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
@@ -102,7 +102,7 @@ public interface ProducerViewMBean {
@MBeanInfo("Resets statistics.")
void resetStatistics();
- @MBeanInfo("Messages consumed")
+ @MBeanInfo("Messages dispatched by Producer")
long getSentCount();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
index 443a266..deefdb4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
@@ -421,12 +421,12 @@ public class SubscriptionView implements SubscriptionViewMBean {
@Override
public void resetStatistics() {
if (subscription != null){
- subscription.getConsumedCount().reset();
+ subscription.resetConsumedCount();
}
}
@Override
public long getConsumedCount() {
- return subscription != null ? subscription.getConsumedCount().getCount() : 0;
+ return subscription != null ? subscription.getConsumedCount() : 0;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 16deed4..efa02cb 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -392,8 +392,9 @@ public abstract class AbstractRegion implements Region {
}
producerExchange.getRegionDestination().send(producerExchange, messageSend);
+
if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
- producerExchange.getProducerState().getInfo().getSentCount().increment();
+ producerExchange.getProducerState().getInfo().incrementSentCount();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index b2ff01c..3a2e2ee 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -20,11 +20,11 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.management.ObjectName;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -36,7 +36,6 @@ import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.LogicExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
-import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.selector.SelectorParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription {
private int cursorMemoryHighWaterMark = 70;
private boolean slowConsumer;
private long lastAckTime;
- private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed");
+ private AtomicLong consumedCount = new AtomicLong();
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
@@ -90,7 +89,7 @@ public abstract class AbstractSubscription implements Subscription {
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
this.lastAckTime = System.currentTimeMillis();
- this.consumedCount.increment();
+ this.consumedCount.incrementAndGet();
}
@Override
@@ -280,7 +279,15 @@ public abstract class AbstractSubscription implements Subscription {
this.lastAckTime = value;
}
- public CountStatisticImpl getConsumedCount(){
- return consumedCount;
+ public long getConsumedCount(){
+ return consumedCount.get();
+ }
+
+ public void incrementConsumedCount(){
+ consumedCount.incrementAndGet();
+ }
+
+ public void resetConsumedCount(){
+ consumedCount.set(0);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index b79b37e..a2c4502 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -21,7 +21,6 @@ import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
@@ -30,7 +29,6 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.management.CountStatisticImpl;
/**
*
@@ -48,7 +46,6 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Used when client acknowledge receipt of dispatched message.
- * @param node
* @throws IOException
* @throws Exception
*/
@@ -70,7 +67,7 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Is the subscription interested in messages in the destination?
- * @param context
+ * @param destination
* @return
*/
boolean matches(ActiveMQDestination destination);
@@ -93,7 +90,6 @@ public interface Subscription extends SubscriptionRecovery {
/**
* The ConsumerInfo object that created the subscription.
- * @param destination
*/
ConsumerInfo getConsumerInfo();
@@ -200,7 +196,7 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
- * @see org.apache.activemq.region.cursors.PendingMessageCursor
+ * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
boolean isRecoveryRequired();
@@ -235,6 +231,10 @@ public interface Subscription extends SubscriptionRecovery {
*/
long getTimeOfLastMessageAck();
- CountStatisticImpl getConsumedCount();
+ long getConsumedCount();
+
+ void incrementConsumedCount();
+
+ void resetConsumedCount();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
index 05ef3a4..7189347 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
@@ -16,7 +16,8 @@
*/
package org.apache.activemq.command;
-import org.apache.activemq.management.CountStatisticImpl;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.activemq.state.CommandVisitor;
/**
@@ -33,7 +34,7 @@ public class ProducerInfo extends BaseCommand {
protected BrokerId[] brokerPath;
protected boolean dispatchAsync;
protected int windowSize;
- protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker");
+ protected AtomicLong sentCount = new AtomicLong();
public ProducerInfo() {
}
@@ -137,8 +138,16 @@ public class ProducerInfo extends BaseCommand {
this.windowSize = windowSize;
}
- public CountStatisticImpl getSentCount(){
- return sentCount;
+ public long getSentCount(){
+ return sentCount.get();
+ }
+
+ public void incrementSentCount(){
+ sentCount.incrementAndGet();
+ }
+
+ public void resetSentCount(){
+ sentCount.set(0);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index d4c05eb..34f53a4 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -17,7 +17,6 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,8 +27,6 @@ import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-
-import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport;
@@ -44,21 +41,7 @@ import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.CONNACK;
-import org.fusesource.mqtt.codec.CONNECT;
-import org.fusesource.mqtt.codec.DISCONNECT;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.fusesource.mqtt.codec.PINGREQ;
-import org.fusesource.mqtt.codec.PINGRESP;
-import org.fusesource.mqtt.codec.PUBACK;
-import org.fusesource.mqtt.codec.PUBCOMP;
-import org.fusesource.mqtt.codec.PUBLISH;
-import org.fusesource.mqtt.codec.PUBREC;
-import org.fusesource.mqtt.codec.PUBREL;
-import org.fusesource.mqtt.codec.SUBACK;
-import org.fusesource.mqtt.codec.SUBSCRIBE;
-import org.fusesource.mqtt.codec.UNSUBACK;
-import org.fusesource.mqtt.codec.UNSUBSCRIBE;
+import org.fusesource.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,10 +79,12 @@ public class MQTTProtocolConverter {
private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch=1;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
+ private final MQTTRetainedMessages retainedMessages;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
this.brokerService = brokerService;
+ this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService);
this.defaultKeepAlive = 0;
}
@@ -319,6 +304,23 @@ public class MQTTProtocolConverter {
} else {
LOG.warn("No topics defined for Subscription " + command);
}
+ //check retained messages
+ if (topics != null){
+ for (Topic topic:topics){
+ Buffer buffer = retainedMessages.getMessage(topic.name().toString());
+ if (buffer != null){
+ PUBLISH msg = new PUBLISH();
+ msg.payload(buffer);
+ msg.topicName(topic.name());
+ try {
+ getMQTTTransport().sendToMQTT(msg.encode());
+ } catch (IOException e) {
+ LOG.warn("Couldn't send retained message " + msg, e);
+ }
+ }
+ }
+ }
+
}
QoS onSubscribe(Topic topic) throws MQTTProtocolException {
@@ -415,6 +417,9 @@ public class MQTTProtocolConverter {
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
+ if (command.retain()){
+ retainedMessages.addMessage(command.topicName().toString(),command.payload());
+ }
ActiveMQMessage message = convertMessage(command);
message.setProducerId(producerId);
message.onSend();
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
new file mode 100644
index 0000000..e502dce
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTRetainedMessages extends ServiceSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class);
+ private static final Object LOCK = new Object();
+ private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000);
+
+ private MQTTRetainedMessages(){
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ cache.clear();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ }
+
+ public void addMessage(String destination,Buffer payload){
+ cache.put(destination,payload);
+ }
+
+ public Buffer getMessage(String destination){
+ return cache.get(destination);
+ }
+
+ public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){
+ MQTTRetainedMessages result = null;
+ if (broker != null){
+ synchronized (LOCK){
+ Service[] services = broker.getServices();
+ if (services != null){
+ for (Service service:services){
+ if (service instanceof MQTTRetainedMessages){
+ return (MQTTRetainedMessages) service;
+ }
+ }
+ }
+ result = new MQTTRetainedMessages();
+ broker.addService(result);
+ if (broker != null && broker.isStarted()){
+ try {
+ result.start();
+ } catch (Exception e) {
+ LOG.warn("Couldn't start MQTTRetainedMessages");
+ }
+ }
+ }
+ }
+
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
index 257517b..d329066 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
@@ -48,7 +48,12 @@ class FuseMQQTTClientProvider implements MQTTClientProvider {
@Override
public void publish(String topic, byte[] payload, int qos) throws Exception {
- connection.publish(topic,payload, QoS.values()[qos],false);
+ publish(topic,payload,qos,false);
+ }
+
+ @Override
+ public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception {
+ connection.publish(topic,payload, QoS.values()[qos],retained);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
index e5d411a..574a6d8 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
public interface MQTTClientProvider {
void connect(String host) throws Exception;
void disconnect() throws Exception;
+ public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception;
void publish(String topic,byte[] payload,int qos) throws Exception;
void subscribe(String topic,int qos) throws Exception;
void unsubscribe(String topic) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 994be67..bf4fac5 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,9 +16,18 @@
*/
package org.apache.activemq.transport.mqtt;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
@@ -35,9 +44,6 @@ import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-
import static org.junit.Assert.assertArrayEquals;
public class MQTTTest extends AbstractMQTTTest {
@@ -281,6 +287,45 @@ public class MQTTTest extends AbstractMQTTTest {
}
@Test(timeout=60 * 1000)
+ public void testSendAndReceiveRetainedMessages() throws Exception {
+
+ addMQTTConnector();
+ brokerService.start();
+
+ final MQTTClientProvider publisher = getMQTTClientProvider();
+ initializeConnection(publisher);
+
+ final MQTTClientProvider subscriber = getMQTTClientProvider();
+ initializeConnection(subscriber);
+
+ String RETAINED = "retained";
+ publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true);
+
+ List<String> messages = new ArrayList<String>();
+ for (int i = 0; i < 10; i++){
+ messages.add("TEST MESSAGE:" + i);
+ }
+
+ subscriber.subscribe("foo",AT_LEAST_ONCE);
+
+ for (int i = 0; i < 10; i++) {
+ publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE);
+ }
+ byte[] msg = subscriber.receive(5000);
+ assertNotNull(msg);
+ assertEquals(RETAINED,new String(msg));
+
+ for (int i =0; i < 10; i++){
+ msg = subscriber.receive(5000);
+ assertNotNull(msg);
+ assertEquals(messages.get(i),new String(msg));
+ }
+ subscriber.disconnect();
+ publisher.disconnect();
+ }
+
+
+ @Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();
TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 9da839d..e9c6664 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -342,8 +342,16 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
- public CountStatisticImpl getConsumedCount() {
- return null;
+ public long getConsumedCount() {
+ return 0;
+ }
+
+ public void incrementConsumedCount(){
+
+ }
+
+ public void resetConsumedCount(){
+
}
};