You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/06 23:41:47 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5290
Repository: activemq
Updated Branches:
refs/heads/trunk fff3c8397 -> 413e4840d
https://issues.apache.org/jira/browse/AMQ-5290
Adds a subscription strategy model where the default is the normal
durable topic subscription based approach or a strategy that maps all
subscriptions and publish operations to a Virtual Topic model. A
network of brokers can network the Queues instead of having the durable
topics subscriptions repaeted on each Broker.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/413e4840
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/413e4840
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/413e4840
Branch: refs/heads/trunk
Commit: 413e4840d64fa8377103ce431a2a2c6cef6e0c20
Parents: fff3c83
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Aug 6 17:41:19 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Aug 6 17:41:19 2014 -0400
----------------------------------------------------------------------
.../transport/mqtt/MQTTInactivityMonitor.java | 18 +-
.../transport/mqtt/MQTTNIOTransportFactory.java | 10 +-
.../transport/mqtt/MQTTProtocolConverter.java | 377 ++++++++-----------
.../transport/mqtt/MQTTSubscription.java | 7 +-
.../transport/mqtt/MQTTTransportFactory.java | 14 +-
.../transport/mqtt/MQTTTransportFilter.java | 24 +-
.../AbstractMQTTSubscriptionStrategy.java | 129 +++++++
.../MQTTDefaultSubscriptionStrategy.java | 151 ++++++++
.../mqtt/strategy/MQTTSubscriptionStrategy.java | 139 +++++++
.../MQTTVirtualTopicSubscriptionStrategy.java | 221 +++++++++++
.../strategies/mqtt-default-subscriptions | 17 +
.../strategies/mqtt-virtual-topic-subscriptions | 17 +
.../activemq/transport/mqtt/MQTTTest.java | 95 ++++-
.../mqtt/MQTTVirtualTopicSubscriptionsTest.java | 59 +++
14 files changed, 1038 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
index 685bb60..22bbac5 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
@@ -70,9 +70,12 @@ public class MQTTInactivityMonitor extends TransportFilter {
int currentCounter = next.getReceiveCounter();
int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
- // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
- // should be sufficient to indicate the connection is still alive. If there were random data, or something
- // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
+ // for the PINGREQ/RESP frames, the currentCounter will be different
+ // from previousCounter, and that
+ // should be sufficient to indicate the connection is still alive.
+ // If there were random data, or something
+ // outside the scope of the spec, the wire format unrmarshalling
+ // would fail, so we don't need to handle
// PINGREQ/RESP explicitly here
if (inReceive.get() || currentCounter != previousCounter) {
if (LOG.isTraceEnabled()) {
@@ -82,24 +85,21 @@ public class MQTTInactivityMonitor extends TransportFilter {
return;
}
- if( (now-lastReceiveTime) >= readKeepAliveTime+readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+ if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() {
- onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress()));
+ onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
+ + next.getRemoteAddress()));
}
});
}
}
};
- private boolean allowReadCheck(long elapsed) {
- return elapsed > (readGraceTime * 9 / 10);
- }
-
public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index 96f7747..efc85be 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -26,7 +26,7 @@ import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
-import org.apache.activemq.broker.BrokerContext;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@@ -44,12 +44,15 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
private BrokerService brokerService = null;
+ @Override
protected String getDefaultWireFormatType() {
return "mqtt";
}
+ @Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
+ @Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new MQTTNIOTransport(format, socket);
}
@@ -58,6 +61,7 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return result;
}
+ @Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
}
@@ -75,6 +79,7 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return transport;
}
+ @Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerService);
@@ -82,16 +87,17 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return super.compositeConfigure(transport, format, options);
}
+ @Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
+ @Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 91087d0..3969d87 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -17,11 +17,7 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
@@ -34,19 +30,13 @@ import javax.jms.Message;
import javax.security.auth.login.CredentialException;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.PrefetchSubscription;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
-import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
@@ -60,17 +50,17 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.LongSequenceGenerator;
import org.fusesource.hawtbuf.Buffer;
@@ -114,9 +104,8 @@ public class MQTTProtocolConverter {
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
- private final Map<String, ActiveMQTopic> activeMQTopicMap = new LRUCache<String, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
+ private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
- private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
@@ -136,6 +125,15 @@ public class MQTTProtocolConverter {
private final MQTTPacketIdGenerator packetIdGenerator;
private boolean publishDollarTopics;
+ private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
+ /*
+ * Subscription strategy configuration element.
+ * > mqtt-default-subscriptions
+ * > mqtt-virtual-topic-subscriptions
+ */
+ private String subscriptionStrategyName = "mqtt-default-subscriptions";
+ private MQTTSubscriptionStrategy subsciptionStrategy;
+
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
this.brokerService = brokerService;
@@ -149,22 +147,26 @@ public class MQTTProtocolConverter {
}
}
- void sendToActiveMQ(Command command, ResponseHandler handler) {
+ public void sendToActiveMQ(Command command, ResponseHandler handler) {
// Lets intercept message send requests..
if (command instanceof ActiveMQMessage) {
ActiveMQMessage msg = (ActiveMQMessage) command;
- if (!getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")) {
- // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
- // specification requirements
- if (handler != null) {
- try {
- handler.onResponse(this, new Response());
- } catch (IOException e) {
- e.printStackTrace();
+ try {
+ if (!getPublishDollarTopics() && getSubscriptionStrategy().isControlTopic(msg.getDestination())) {
+ // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
+ // specification requirements for system assigned destinations.
+ if (handler != null) {
+ try {
+ handler.onResponse(this, new Response());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
+ return;
}
- return;
+ } catch (IOException e) {
+ e.printStackTrace();
}
}
@@ -189,54 +191,43 @@ public class MQTTProtocolConverter {
*/
public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
switch (frame.messageType()) {
- case PINGREQ.TYPE: {
+ case PINGREQ.TYPE:
LOG.debug("Received a ping from client: " + getClientId());
sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
- }
- case CONNECT.TYPE: {
+ case CONNECT.TYPE:
CONNECT connect = new CONNECT().decode(frame);
onMQTTConnect(connect);
LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
break;
- }
- case DISCONNECT.TYPE: {
+ case DISCONNECT.TYPE:
LOG.debug("MQTT Client {} disconnecting", getClientId());
onMQTTDisconnect();
break;
- }
- case SUBSCRIBE.TYPE: {
+ case SUBSCRIBE.TYPE:
onSubscribe(new SUBSCRIBE().decode(frame));
break;
- }
- case UNSUBSCRIBE.TYPE: {
+ case UNSUBSCRIBE.TYPE:
onUnSubscribe(new UNSUBSCRIBE().decode(frame));
break;
- }
- case PUBLISH.TYPE: {
+ case PUBLISH.TYPE:
onMQTTPublish(new PUBLISH().decode(frame));
break;
- }
- case PUBACK.TYPE: {
+ case PUBACK.TYPE:
onMQTTPubAck(new PUBACK().decode(frame));
break;
- }
- case PUBREC.TYPE: {
+ case PUBREC.TYPE:
onMQTTPubRec(new PUBREC().decode(frame));
break;
- }
- case PUBREL.TYPE: {
+ case PUBREL.TYPE:
onMQTTPubRel(new PUBREL().decode(frame));
break;
- }
- case PUBCOMP.TYPE: {
+ case PUBCOMP.TYPE:
onMQTTPubComp(new PUBCOMP().decode(frame));
break;
- }
- default: {
+ default:
handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
- }
}
}
@@ -332,52 +323,17 @@ public class MQTTProtocolConverter {
connected.set(true);
getMQTTTransport().sendToMQTT(ack.encode());
- List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
if (connect.cleanSession()) {
packetIdGenerator.stopClientSession(getClientId());
- deleteDurableSubs(subs);
} else {
packetIdGenerator.startClientSession(getClientId());
- restoreDurableSubs(subs);
}
- }
- });
- }
- });
- }
- public void deleteDurableSubs(List<SubscriptionInfo> subs) {
- try {
- for (SubscriptionInfo sub : subs) {
- RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
- rsi.setConnectionId(connectionId);
- rsi.setSubscriptionName(sub.getSubcriptionName());
- rsi.setClientId(sub.getClientId());
- sendToActiveMQ(rsi, new ResponseHandler() {
- @Override
- public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
- // ignore failures..
+ getSubscriptionStrategy().onConnect(connect);
}
});
}
- } catch (Throwable e) {
- LOG.warn("Could not delete the MQTT durable subs.", e);
- }
- }
-
- public void restoreDurableSubs(List<SubscriptionInfo> subs) {
- try {
- for (SubscriptionInfo sub : subs) {
- String name = sub.getSubcriptionName();
- String[] split = name.split(":", 2);
- QoS qoS = QoS.valueOf(split[0]);
- onSubscribe(new Topic(split[1], qoS));
- // mark this durable subscription as restored by Broker
- restoredSubs.add(split[1]);
- }
- } catch (IOException e) {
- LOG.warn("Could not restore the MQTT durable subs.", e);
- }
+ });
}
void onMQTTDisconnect() throws MQTTProtocolException {
@@ -408,42 +364,41 @@ public class MQTTProtocolConverter {
} else {
LOG.warn("No topics defined for Subscription " + command);
}
-
}
- byte onSubscribe(final Topic topic) throws MQTTProtocolException {
+ public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
- final String topicName = topic.name().toString();
- final QoS topicQoS = topic.qos();
+ final String destinationName = topic.name().toString();
+ final QoS requestedQoS = topic.qos();
- if (mqttSubscriptionByTopic.containsKey(topicName)) {
- final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
- if (topicQoS != mqttSubscription.qos()) {
+ if (mqttSubscriptionByTopic.containsKey(destinationName)) {
+ final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
+ if (requestedQoS != mqttSubscription.getQoS()) {
// remove old subscription as the QoS has changed
- onUnSubscribe(topicName);
+ onUnSubscribe(destinationName);
} else {
- // duplicate SUBSCRIBE packet, find all matching topics and re-send retained messages
- resendRetainedMessages(mqttSubscription);
- return (byte) topicQoS.ordinal();
+ try {
+ getSubscriptionStrategy().onReSubscribe(mqttSubscription);
+ } catch (IOException e) {
+ throw new MQTTProtocolException("Failed to find subscription strategy", true, e);
+ }
+ return (byte) requestedQoS.ordinal();
}
}
- ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
-
- ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
- ConsumerInfo consumerInfo = new ConsumerInfo(id);
- consumerInfo.setDestination(destination);
- consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
- consumerInfo.setRetroactive(true);
- consumerInfo.setDispatchAsync(true);
- // create durable subscriptions only when cleansession is false
- if (!connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
- consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
+ try {
+ return getSubscriptionStrategy().onSubscribe(destinationName, requestedQoS);
+ } catch (IOException e) {
+ throw new MQTTProtocolException("Failed while intercepting subscribe", true, e);
}
- MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, topicQoS, consumerInfo);
+ }
+
+ public byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException {
+
+ MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, qoS, consumerInfo);
// optimistic add to local maps first to be able to handle commands in onActiveMQCommand
- subscriptionsByConsumerId.put(id, mqttSubscription);
+ subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
mqttSubscriptionByTopic.put(topicName, mqttSubscription);
final byte[] qos = {-1};
@@ -453,79 +408,24 @@ public class MQTTProtocolConverter {
// validate subscription request
if (response.isException()) {
final Throwable throwable = ((ExceptionResponse) response).getException();
- LOG.warn("Error subscribing to " + topicName, throwable);
+ LOG.warn("Error subscribing to {}", topicName, throwable);
qos[0] = SUBSCRIBE_ERROR;
} else {
- qos[0] = (byte) topicQoS.ordinal();
+ qos[0] = (byte) qoS.ordinal();
}
}
});
if (qos[0] == SUBSCRIBE_ERROR) {
// remove from local maps if subscribe failed
- subscriptionsByConsumerId.remove(id);
+ subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
mqttSubscriptionByTopic.remove(topicName);
}
return qos[0];
}
- private void resendRetainedMessages(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
-
- ActiveMQDestination destination = mqttSubscription.getDestination();
-
- // check whether the Topic has been recovered in restoreDurableSubs
- // mark subscription available for recovery for duplicate subscription
- if (restoredSubs.remove(destination.getPhysicalName())) {
- return;
- }
-
- String topicName = mqttSubscription.getTopicName();
- // get TopicRegion
- RegionBroker regionBroker;
- try {
- regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
- } catch (Exception e) {
- throw new MQTTProtocolException("Error subscribing to " + topicName + ": " + e.getMessage(), false, e);
- }
- final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
-
- final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
- final ConsumerId consumerId = consumerInfo.getConsumerId();
-
- // use actual client id used to create connection to lookup connection context
- final String connectionInfoClientId = connectionInfo.getClientId();
- final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
-
- // get all matching Topics
- final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
- for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
-
- // recover retroactive messages for matching subscription
- for (Subscription subscription : dest.getConsumers()) {
- if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
- try {
- if (dest instanceof org.apache.activemq.broker.region.Topic) {
- ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
- } else if (dest instanceof VirtualTopicInterceptor) {
- ((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
- }
- if (subscription instanceof PrefetchSubscription) {
- // request dispatch for prefetch subs
- PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
- prefetchSubscription.dispatchPending();
- }
- } catch (Exception e) {
- throw new MQTTProtocolException("Error recovering retained messages for " +
- dest.getName() + ": " + e.getMessage(), false, e);
- }
- break;
- }
- }
- }
- }
-
- void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
+ public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
UTF8Buffer[] topics = command.topics();
if (topics != null) {
@@ -538,33 +438,38 @@ public class MQTTProtocolConverter {
sendToMQTT(ack.encode());
}
- void onUnSubscribe(String topicName) {
- MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
- if (subs != null) {
- ConsumerInfo info = subs.getConsumerInfo();
- if (info != null) {
- subscriptionsByConsumerId.remove(info.getConsumerId());
- }
- RemoveInfo removeInfo = null;
- if (info != null) {
- removeInfo = info.createRemoveCommand();
- }
- sendToActiveMQ(removeInfo, null);
-
- // check if the durable sub also needs to be removed
- if (subs.getConsumerInfo().getSubscriptionName() != null) {
- // also remove it from restored durable subscriptions set
- restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
-
- RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
- rsi.setConnectionId(connectionId);
- rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());
- rsi.setClientId(connectionInfo.getClientId());
- sendToActiveMQ(rsi, null);
+ public void onUnSubscribe(String topicName) {
+ MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
+ if (subscription != null) {
+ doUnSubscribe(subscription);
+
+ // check if the broker side of the subscription needs to be removed
+ try {
+ getSubscriptionStrategy().onUnSubscribe(subscription);
+ } catch (IOException e) {
+ // Ignore
}
}
}
+ public void doUnSubscribe(MQTTSubscription subscription) {
+ mqttSubscriptionByTopic.remove(subscription.getTopicName());
+ ConsumerInfo info = subscription.getConsumerInfo();
+ if (info != null) {
+ subscriptionsByConsumerId.remove(info.getConsumerId());
+ }
+ RemoveInfo removeInfo = null;
+ if (info != null) {
+ removeInfo = info.createRemoveCommand();
+ }
+ sendToActiveMQ(removeInfo, new ResponseHandler() {
+ @Override
+ public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+ // ignore failures..
+ }
+ });
+ }
+
/**
* Dispatch an ActiveMQ command
*/
@@ -610,7 +515,7 @@ public class MQTTProtocolConverter {
} else if (command.isBrokerInfo()) {
//ignore
} else {
- LOG.debug("Do not know how to process ActiveMQ Command " + command);
+ LOG.debug("Do not know how to process ActiveMQ Command {}", command);
}
}
@@ -647,7 +552,7 @@ public class MQTTProtocolConverter {
ack = publisherRecs.remove(command.messageId());
}
if (ack == null) {
- LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+ LOG.warn("Unknown PUBREL: {} received", command.messageId());
}
PUBCOMP pubcomp = new PUBCOMP();
pubcomp.messageId(command.messageId());
@@ -680,16 +585,23 @@ public class MQTTProtocolConverter {
msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
}
- ActiveMQTopic topic;
- synchronized (activeMQTopicMap) {
- topic = activeMQTopicMap.get(command.topicName());
- if (topic == null) {
+ ActiveMQDestination destination;
+ synchronized (activeMQDestinationMap) {
+ destination = activeMQDestinationMap.get(command.topicName());
+ if (destination == null) {
String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
- topic = new ActiveMQTopic(topicName);
- activeMQTopicMap.put(command.topicName().toString(), topic);
+
+ try {
+ destination = getSubscriptionStrategy().onSend(topicName);
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+
+ activeMQDestinationMap.put(command.topicName().toString(), destination);
}
}
- msg.setJMSDestination(topic);
+
+ msg.setJMSDestination(destination);
msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
return msg;
}
@@ -714,7 +626,8 @@ public class MQTTProtocolConverter {
synchronized (mqttTopicMap) {
topicName = mqttTopicMap.get(message.getJMSDestination());
if (topicName == null) {
- topicName = MQTTProtocolSupport.convertActiveMQToMQTT(message.getDestination().getPhysicalName());
+ String amqTopicName = getSubscriptionStrategy().onSend(message.getDestination());
+ topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
mqttTopicMap.put(message.getJMSDestination(), topicName);
}
}
@@ -803,9 +716,7 @@ public class MQTTProtocolConverter {
long keepAliveMS = keepAliveSeconds * 1000;
- if (LOG.isDebugEnabled()) {
- LOG.debug("MQTT Client " + getClientId() + " requests heart beat of " + keepAliveMS + " ms");
- }
+ LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
try {
// if we have a default keep-alive value, and the client is trying to turn off keep-alive,
@@ -822,12 +733,8 @@ public class MQTTProtocolConverter {
monitor.setReadGraceTime(readGracePeriod);
monitor.startMonitorThread();
- if (LOG.isDebugEnabled()) {
- LOG.debug("MQTT Client " + getClientId() +
- " established heart beat of " + keepAliveMS +
- " ms (" + keepAliveMS + "ms + " + readGracePeriod +
- "ms grace period)");
- }
+ LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)",
+ new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
@@ -835,9 +742,7 @@ public class MQTTProtocolConverter {
void handleException(Throwable exception, MQTTFrame command) {
LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Exception detail", exception);
- }
+ LOG.debug("Exception detail", exception);
if (connected.get() && connectionInfo != null) {
connected.set(false);
@@ -861,7 +766,6 @@ public class MQTTProtocolConverter {
}
ResponseHandler createResponseHandler(final PUBLISH command) {
-
if (command != null) {
switch (command.qos()) {
case AT_LEAST_ONCE:
@@ -944,6 +848,22 @@ public class MQTTProtocolConverter {
return connectionId;
}
+ public ConsumerId getNextConsumerId() {
+ return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+ }
+
+ public boolean isCleanSession() {
+ return this.connect.cleanSession();
+ }
+
+ public String getSubscriptionStrategyName() {
+ return subscriptionStrategyName;
+ }
+
+ public void setSubscriptionStrategyName(String name) {
+ this.subscriptionStrategyName = name;
+ }
+
public String getClientId() {
if (clientId == null) {
if (connect != null && connect.clientId() != null) {
@@ -954,4 +874,33 @@ public class MQTTProtocolConverter {
}
return clientId;
}
+
+ protected MQTTSubscriptionStrategy getSubscriptionStrategy() throws IOException {
+ if (subsciptionStrategy == null) {
+ synchronized (STRATAGY_FINDER) {
+ if (subsciptionStrategy != null) {
+ return subsciptionStrategy;
+ }
+
+ MQTTSubscriptionStrategy strategy = null;
+ if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
+ try {
+ strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
+ LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
+ if (strategy instanceof BrokerServiceAware) {
+ ((BrokerServiceAware)strategy).setBrokerService(brokerService);
+ }
+ strategy.initialize(this);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ } else {
+ throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
+ }
+
+ this.subsciptionStrategy = strategy;
+ }
+ }
+ return subsciptionStrategy;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
index bbed655..d265335 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
@@ -132,7 +132,12 @@ public class MQTTSubscription {
/**
* @return the assigned QoS value for this subscription.
*/
- public QoS qos() {
+ public QoS getQoS() {
return qos;
}
+
+ @Override
+ public String toString() {
+ return "MQTT Sub: topic[" + topicName + "] -> [" + consumerInfo.getDestination() + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
index da23ba5..47afedb 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
@@ -17,13 +17,13 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
-import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.activemq.broker.BrokerContext;
+import javax.net.ServerSocketFactory;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.MutexTransport;
@@ -33,8 +33,6 @@ import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
-import javax.net.ServerSocketFactory;
-
/**
* A <a href="http://mqtt.org/">MQTT</a> transport factory
*/
@@ -42,16 +40,19 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
private BrokerService brokerService = null;
+ @Override
protected String getDefaultWireFormatType() {
return "mqtt";
}
+ @Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
+ TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
result.setAllowLinkStealing(true);
return result;
}
+ @Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new MQTTTransportFilter(transport, format, brokerService);
@@ -59,6 +60,7 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
return super.compositeConfigure(transport, format, options);
}
+ @Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@@ -79,10 +81,8 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
@Override
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
-
MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
filter.setInactivityMonitor(monitor);
-
return monitor;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 8612c25..ae557ab 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
@@ -29,7 +30,20 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.mqtt.codec.*;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,6 +211,14 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
protocolConverter.setPublishDollarTopics(publishDollarTopics);
}
+ public String getSubscriptionStrategyName() {
+ return protocolConverter != null ? protocolConverter.getSubscriptionStrategyName() : "default";
+ }
+
+ public void setSubscriptionStrategyName(String name) {
+ protocolConverter.setSubscriptionStrategyName(name);
+ }
+
public int getActiveMQSubscriptionPrefetch() {
return protocolConverter.getActiveMQSubscriptionPrefetch();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
new file mode 100644
index 0000000..60259e2
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+
+/**
+ * Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing
+ * the base functionality that is common to most implementations.
+ */
+public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy, BrokerServiceAware {
+
+ protected MQTTProtocolConverter protocol;
+ protected BrokerService brokerService;
+
+ @Override
+ public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException {
+ setProtocolConverter(protocol);
+ }
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ @Override
+ public void setProtocolConverter(MQTTProtocolConverter parent) {
+ this.protocol = parent;
+ }
+
+ @Override
+ public MQTTProtocolConverter getProtocolConverter() {
+ return protocol;
+ }
+
+ @Override
+ public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+ String topicName = mqttSubscription.getTopicName();
+
+ // get TopicRegion
+ RegionBroker regionBroker;
+ try {
+ regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+ } catch (Exception e) {
+ throw new MQTTProtocolException("Error subscribing to " + topicName + ": " + e.getMessage(), false, e);
+ }
+ final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+
+ final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+ final ConsumerId consumerId = consumerInfo.getConsumerId();
+
+ // use actual client id used to create connection to lookup connection
+ // context
+ final String connectionInfoClientId = protocol.getClientId();
+ final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
+
+ // get all matching Topics
+ final Set<org.apache.activemq.broker.region.Destination> matchingDestinations =
+ topicRegion.getDestinations(mqttSubscription.getDestination());
+ for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
+
+ // recover retroactive messages for matching subscription
+ for (Subscription subscription : dest.getConsumers()) {
+ if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
+ try {
+ if (dest instanceof org.apache.activemq.broker.region.Topic) {
+ ((org.apache.activemq.broker.region.Topic) dest).recoverRetroactiveMessages(connectionContext, subscription);
+ } else if (dest instanceof VirtualTopicInterceptor) {
+ ((VirtualTopicInterceptor) dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
+ }
+ if (subscription instanceof PrefetchSubscription) {
+ // request dispatch for prefetch subs
+ PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
+ prefetchSubscription.dispatchPending();
+ }
+ } catch (Exception e) {
+ throw new MQTTProtocolException("Error recovering retained messages for " + dest.getName() + ": " + e.getMessage(), false, e);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public ActiveMQDestination onSend(String topicName) {
+ return new ActiveMQTopic(topicName);
+ }
+
+ @Override
+ public String onSend(ActiveMQDestination destination) {
+ return destination.getPhysicalName();
+ }
+
+ @Override
+ public boolean isControlTopic(ActiveMQDestination destination) {
+ return destination.getPhysicalName().startsWith("$");
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
new file mode 100644
index 0000000..4e8f362
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.apache.activemq.transport.mqtt.ResponseHandler;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation that uses unmapped topic subscriptions.
+ */
+public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class);
+
+ private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
+
+ @Override
+ public void onConnect(CONNECT connect) throws MQTTProtocolException {
+ List<SubscriptionInfo> subs;
+ try {
+ subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), protocol.getClientId());
+ } catch (IOException e) {
+ throw new MQTTProtocolException("Error loading store subscriptions", true, e);
+ }
+ if (connect.cleanSession()) {
+ deleteDurableSubs(subs);
+ } else {
+ restoreDurableSubs(subs);
+ }
+ }
+
+ @Override
+ public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
+ ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
+
+ ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+ consumerInfo.setDestination(destination);
+ consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+ consumerInfo.setRetroactive(true);
+ consumerInfo.setDispatchAsync(true);
+ // create durable subscriptions only when clean session is false
+ if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
+ consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
+ }
+
+ return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
+ }
+
+ @Override
+ public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+
+ ActiveMQDestination destination = mqttSubscription.getDestination();
+
+ // check whether the Topic has been recovered in restoreDurableSubs
+ // mark subscription available for recovery for duplicate subscription
+ if (restoredSubs.remove(destination.getPhysicalName())) {
+ return;
+ }
+
+ super.onReSubscribe(mqttSubscription);
+ }
+
+ @Override
+ public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException {
+ // check if the durable sub also needs to be removed
+ if (subscription.getConsumerInfo().getSubscriptionName() != null) {
+ // also remove it from restored durable subscriptions set
+ restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
+
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(protocol.getConnectionId());
+ rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
+ rsi.setClientId(protocol.getClientId());
+ protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+ @Override
+ public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+ // ignore failures..
+ }
+ });
+ }
+ }
+
+ private void deleteDurableSubs(List<SubscriptionInfo> subs) {
+ try {
+ for (SubscriptionInfo sub : subs) {
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(protocol.getConnectionId());
+ rsi.setSubscriptionName(sub.getSubcriptionName());
+ rsi.setClientId(sub.getClientId());
+ protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+ @Override
+ public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+ // ignore failures..
+ }
+ });
+ }
+ } catch (Throwable e) {
+ LOG.warn("Could not delete the MQTT durable subs.", e);
+ }
+ }
+
+ private void restoreDurableSubs(List<SubscriptionInfo> subs) {
+ try {
+ for (SubscriptionInfo sub : subs) {
+ String name = sub.getSubcriptionName();
+ String[] split = name.split(":", 2);
+ QoS qoS = QoS.valueOf(split[0]);
+ protocol.onSubscribe(new Topic(split[1], qoS));
+ // mark this durable subscription as restored by Broker
+ restoredSubs.add(split[1]);
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not restore the MQTT durable subs.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
new file mode 100644
index 0000000..0eaa72a
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.codec.CONNECT;
+
+/**
+ * Subscription management strategy used to control how MQTT clients
+ * subscribe to destination and how messages are addressed in order to
+ * arrive on the appropriate destinations.
+ */
+public interface MQTTSubscriptionStrategy {
+
+ /**
+ * Initialize the strategy before first use.
+ *
+ * @param protocol
+ * the MQTTProtocolConverter that is initializing the strategy
+ *
+ * @throws MQTTProtocolException if an error occurs during initialization.
+ */
+ public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException;
+
+ /**
+ * Allows the strategy to perform any needed actions on client connect
+ * prior to the CONNACK frame being sent back such as recovering old
+ * subscriptions and performing any clean session actions.
+ *
+ * @throws MQTTProtocolException if an error occurs while processing the connect actions.
+ */
+ public void onConnect(CONNECT connect) throws MQTTProtocolException;
+
+ /**
+ * Called when a new Subscription is being requested. This method allows the
+ * strategy to create a specific type of subscription for the client such as
+ * mapping topic subscriptions to Queues etc.
+ *
+ * @param topicName
+ * the requested Topic name to subscribe to.
+ * @param requestedQoS
+ * the QoS level that the client has requested for this subscription.
+ *
+ * @return the assigned QoS value given to the new subscription
+ *
+ * @throws MQTTProtocolException if an error occurs while processing the subscribe actions.
+ */
+ public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException;
+
+ /**
+ * Called when a client sends a duplicate subscribe request which should
+ * force any retained messages on that topic to be replayed again as though
+ * the client had just subscribed for the first time. The method should
+ * not unsubscribe the client as it might miss messages sent while the
+ * subscription is being recreated.
+ *
+ * @param subscription
+ * the MQTTSubscription that contains the subscription state.
+ */
+ public void onReSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
+
+ /**
+ * Called when a client requests an un-subscribe a previous subscription.
+ *
+ * @param subscription
+ * the {@link MQTTSubscription} that is being removed.
+ *
+ * @throws MQTTProtocolException if an error occurs during the un-subscribe processing.
+ */
+ public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
+
+ /**
+ * Intercepts PUBLISH operations from the client and allows the strategy to map the
+ * target destination so that the send operation will land in the destinations that
+ * this strategy has mapped the incoming subscribe requests to.
+ *
+ * @param topicName
+ * the targeted Topic that the client sent the message to.
+ *
+ * @return an ActiveMQ Topic instance that lands the send in the correct destinations.
+ */
+ public ActiveMQDestination onSend(String topicName);
+
+ /**
+ * Intercepts send operations from the broker and allows the strategy to map the
+ * target topic name so that the client sees a valid Topic name.
+ *
+ * @param destination
+ * the destination that the message was dispatched from
+ *
+ * @return an Topic name that is valid for the receiving client.
+ */
+ public String onSend(ActiveMQDestination destination);
+
+ /**
+ * Allows the protocol handler to interrogate an destination name to determine if it
+ * is equivalent to the MQTT control topic (starts with $). Since the mapped destinations
+ * that the strategy might alter the naming scheme the strategy must provide a way to
+ * reverse map and determine if the destination was originally an MQTT control topic.
+ *
+ * @param destination
+ * the destination to query.
+ *
+ * @return true if the destination is an MQTT control topic.
+ */
+ public boolean isControlTopic(ActiveMQDestination destination);
+
+ /**
+ * Sets the {@link MQTTProtocolConverter} that is the parent of this strategy object.
+ *
+ * @param parent
+ * the {@link MQTTProtocolConverter} that owns this strategy.
+ */
+ public void setProtocolConverter(MQTTProtocolConverter parent);
+
+ /**
+ * @return the {@link MQTTProtocolConverter} that owns this strategy.
+ */
+ public MQTTProtocolConverter getProtocolConverter();
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
new file mode 100644
index 0000000..64995c6
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT;
+import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.apache.activemq.transport.mqtt.ResponseHandler;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscription strategy that converts all MQTT subscribes that would be durable to
+ * Virtual Topic Queue subscriptions. Also maps all publish requests to be prefixed
+ * with the VirtualTopic. prefix unless already present.
+ */
+public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
+
+ private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic.";
+ private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer.";
+
+ private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class);
+
+ private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>());
+
+ @Override
+ public void onConnect(CONNECT connect) throws MQTTProtocolException {
+ List<ActiveMQQueue> queues;
+ try {
+ queues = PersistenceAdapterSupport.listQueues(brokerService.getPersistenceAdapter(), new PersistenceAdapterSupport.DestinationMatcher() {
+
+ @Override
+ public boolean matches(ActiveMQDestination destination) {
+ if (destination.getPhysicalName().startsWith("Consumer." + protocol.getClientId())) {
+ LOG.debug("Recovered client sub: {}", destination.getPhysicalName());
+ return true;
+ }
+ return false;
+ }
+ });
+ } catch (IOException e) {
+ throw new MQTTProtocolException("Error restoring durable subscriptions", true, e);
+ }
+
+ if (connect.cleanSession()) {
+ deleteDurableQueues(queues);
+ } else {
+ restoreDurableQueue(queues);
+ }
+ }
+
+ @Override
+ public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
+ ActiveMQDestination destination = null;
+ if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
+ String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." +
+ VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
+ destination = new ActiveMQQueue(converted);
+ } else {
+ String converted = convertMQTTToActiveMQ(topicName);
+ if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
+ converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
+ }
+ destination = new ActiveMQTopic(converted);
+ }
+
+ ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+ consumerInfo.setDestination(destination);
+ consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+ consumerInfo.setRetroactive(true);
+ consumerInfo.setDispatchAsync(true);
+
+ return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
+ }
+
+ @Override
+ public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+
+ ActiveMQDestination destination = mqttSubscription.getDestination();
+
+ // check whether the Topic has been recovered in restoreDurableSubs
+ // mark subscription available for recovery for duplicate subscription
+ if (restoredQueues.remove(destination)) {
+ return;
+ }
+
+ if (mqttSubscription.getDestination().isTopic()) {
+ super.onReSubscribe(mqttSubscription);
+ } else {
+ protocol.doUnSubscribe(mqttSubscription);
+ ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+ consumerInfo.setConsumerId(protocol.getNextConsumerId());
+ protocol.doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
+ }
+ }
+
+ @Override
+ public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException {
+ if (subscription.getDestination().isQueue()) {
+ DestinationInfo remove = new DestinationInfo();
+ remove.setConnectionId(protocol.getConnectionId());
+ remove.setDestination(subscription.getDestination());
+ remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+
+ protocol.sendToActiveMQ(remove, new ResponseHandler() {
+ @Override
+ public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+ // ignore failures..
+ }
+ });
+ }
+ }
+
+ @Override
+ public ActiveMQDestination onSend(String topicName) {
+ if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
+ return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
+ } else {
+ return new ActiveMQTopic(topicName);
+ }
+ }
+
+ @Override
+ public String onSend(ActiveMQDestination destination) {
+ String amqTopicName = destination.getPhysicalName();
+ if (amqTopicName.startsWith(VIRTUALTOPIC_PREFIX)) {
+ amqTopicName = amqTopicName.substring(VIRTUALTOPIC_PREFIX.length());
+ }
+ return amqTopicName;
+ }
+
+ @Override
+ public boolean isControlTopic(ActiveMQDestination destination) {
+ String destinationName = destination.getPhysicalName();
+ if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) {
+ return true;
+ }
+ return false;
+ }
+
+ private void deleteDurableQueues(List<ActiveMQQueue> queues) {
+ try {
+ for (ActiveMQQueue queue : queues) {
+ DestinationInfo removeAction = new DestinationInfo();
+ removeAction.setConnectionId(protocol.getConnectionId());
+ removeAction.setDestination(queue);
+ removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+
+ protocol.sendToActiveMQ(removeAction, new ResponseHandler() {
+ @Override
+ public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+ // ignore failures..
+ }
+ });
+ }
+ } catch (Throwable e) {
+ LOG.warn("Could not delete the MQTT durable subs.", e);
+ }
+ }
+
+ private void restoreDurableQueue(List<ActiveMQQueue> queues) {
+ try {
+ for (ActiveMQQueue queue : queues) {
+ String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length());
+ StringTokenizer tokenizer = new StringTokenizer(name);
+ tokenizer.nextToken(":.");
+ String qosString = tokenizer.nextToken();
+ tokenizer.nextToken();
+ String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
+ QoS qoS = QoS.valueOf(qosString);
+ LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
+
+ ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+ consumerInfo.setDestination(queue);
+ consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+ consumerInfo.setRetroactive(true);
+ consumerInfo.setDispatchAsync(true);
+
+ protocol.doSubscribe(consumerInfo, topicName, qoS);
+
+ // mark this durable subscription as restored by Broker
+ restoredQueues.add(queue);
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not restore the MQTT durable subs.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions
new file mode 100644
index 0000000..006e0ba
--- /dev/null
+++ b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.mqtt.strategy.MQTTDefaultSubscriptionStrategy
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions
new file mode 100644
index 0000000..893b0b3
--- /dev/null
+++ b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.mqtt.strategy.MQTTVirtualTopicSubscriptionStrategy
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index c4571dc..3b4062d 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -62,6 +62,7 @@ import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,7 +154,7 @@ public class MQTTTest extends MQTTTestSupport {
publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
}
- latch.await(10, TimeUnit.SECONDS);
+ latch.await(20, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscriptionProvider.disconnect();
publishProvider.disconnect();
@@ -488,13 +489,83 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 120 * 1000)
public void testRetainedMessage() throws Exception {
MQTT mqtt = createMQTTConnection();
- mqtt.setKeepAlive((short) 2);
+ mqtt.setKeepAlive((short) 60);
final String RETAIN = "RETAIN";
final String TOPICA = "TopicA";
final String[] clientIds = { null, "foo", "durable" };
for (String clientId : clientIds) {
+ LOG.info("Testing now with Client ID: {}", clientId);
+
+ mqtt.setClientId(clientId);
+ mqtt.setCleanSession(!"durable".equals(clientId));
+
+ BlockingConnection connection = mqtt.blockingConnection();
+ connection.connect();
+
+ // set retained message and check
+ connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ // test duplicate subscription
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(15000, TimeUnit.MILLISECONDS);
+ assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+ connection.unsubscribe(new String[]{TOPICA});
+
+ // clear retained message and check that we don't receive it
+ connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(500, TimeUnit.MILLISECONDS);
+ assertNull("Retained message not cleared for " + clientId, msg);
+ connection.unsubscribe(new String[]{TOPICA});
+
+ // set retained message again and check
+ connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No reset retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ // re-connect and check
+ connection.disconnect();
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+ msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull("No reset retained message for " + clientId, msg);
+ assertEquals(RETAIN, new String(msg.getPayload()));
+ msg.ack();
+ assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+ connection.unsubscribe(new String[]{TOPICA});
+ connection.disconnect();
+ }
+ }
+
+ @Ignore
+ @Test(timeout = 120 * 1000)
+ public void testRetainedMessageOnVirtualTopics() throws Exception {
+ MQTT mqtt = createMQTTConnection();
+ mqtt.setKeepAlive((short) 60);
+
+ final String RETAIN = "RETAIN";
+ final String TOPICA = "VirtualTopic/TopicA";
+
+ final String[] clientIds = { null, "foo", "durable" };
+ for (String clientId : clientIds) {
+ LOG.info("Testing now with Client ID: {}", clientId);
mqtt.setClientId(clientId);
mqtt.setCleanSession(!"durable".equals(clientId));
@@ -547,6 +618,7 @@ public class MQTTTest extends MQTTTestSupport {
msg.ack();
assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+ LOG.info("Test now unsubscribing from: {} for the last time", TOPICA);
connection.unsubscribe(new String[]{TOPICA});
connection.disconnect();
}
@@ -914,9 +986,12 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
+ doTestSendMQTTReceiveJMS("foo.*");
+ }
+
+ public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
- final String DESTINATION_NAME = "foo.*";
// send retained message
final String RETAINED = "RETAINED";
@@ -927,7 +1002,7 @@ public class MQTTTest extends MQTTTestSupport {
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
+ javax.jms.Topic jmsTopic = s.createTopic(destinationName);
MessageConsumer consumer = s.createConsumer(jmsTopic);
// check whether we received retained message on JMS subscribe
@@ -952,6 +1027,10 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 2 * 60 * 1000)
public void testSendJMSReceiveMQTT() throws Exception {
+ doTestSendJMSReceiveMQTT("foo.far");
+ }
+
+ public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
@@ -959,7 +1038,7 @@ public class MQTTTest extends MQTTTestSupport {
activeMQConnection.setUseRetroactiveConsumer(true);
activeMQConnection.start();
Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Topic jmsTopic = s.createTopic("foo.far");
+ javax.jms.Topic jmsTopic = s.createTopic(destinationName);
MessageProducer producer = s.createProducer(jmsTopic);
// send retained message from JMS
@@ -1130,10 +1209,14 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 30 * 10000)
public void testJmsMapping() throws Exception {
+ doTestJmsMapping("test.foo");
+ }
+
+ public void doTestJmsMapping(String destinationName) throws Exception {
// start up jms consumer
Connection jmsConn = cf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = session.createTopic("test.foo");
+ Destination dest = session.createTopic(destinationName);
MessageConsumer consumer = session.createConsumer(dest);
jmsConn.start();
http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
new file mode 100644
index 0000000..6605f53
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ protocolConfig = "transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+ super.setUp();
+ }
+
+ // TODO - This currently fails on the durable case because we have a hard time
+ // recovering the original Topic name when a client tries to subscribe
+ // durable to a VirtualTopic.* type topic.
+ @Override
+ @Ignore
+ public void testRetainedMessageOnVirtualTopics() throws Exception {}
+
+ @Override
+ @Test(timeout = 60 * 1000)
+ public void testSendMQTTReceiveJMS() throws Exception {
+ doTestSendMQTTReceiveJMS("VirtualTopic.foo.*");
+ }
+
+ @Override
+ @Test(timeout = 2 * 60 * 1000)
+ public void testSendJMSReceiveMQTT() throws Exception {
+ doTestSendJMSReceiveMQTT("VirtualTopic.foo.far");
+ }
+
+ @Override
+ @Test(timeout = 30 * 10000)
+ public void testJmsMapping() throws Exception {
+ doTestJmsMapping("VirtualTopic.test.foo");
+ }
+}