You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/07/09 16:18:10 UTC
[4/5] activemq-artemis git commit: Added Initial MQTT Protocol Support
Added Initial MQTT Protocol Support
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0f82ca75
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0f82ca75
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0f82ca75
Branch: refs/heads/master
Commit: 0f82ca754bebcd739dfcce37707bc1d0c4b132ef
Parents: 077e9e2
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Jul 6 17:01:08 2015 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Jul 9 11:08:04 2015 +0100
----------------------------------------------------------------------
artemis-distribution/pom.xml | 9 +
artemis-distribution/src/main/assembly/dep.xml | 2 +
artemis-protocols/artemis-mqtt-protocol/pom.xml | 56 +
.../core/protocol/mqtt/MQTTConnection.java | 241 +++
.../protocol/mqtt/MQTTConnectionManager.java | 205 ++
.../core/protocol/mqtt/MQTTFailureListener.java | 47 +
.../artemis/core/protocol/mqtt/MQTTLogger.java | 43 +
.../core/protocol/mqtt/MQTTMessageInfo.java | 57 +
.../core/protocol/mqtt/MQTTProtocolHandler.java | 362 ++++
.../core/protocol/mqtt/MQTTProtocolManager.java | 140 ++
.../mqtt/MQTTProtocolManagerFactory.java | 58 +
.../core/protocol/mqtt/MQTTPublishManager.java | 270 +++
.../protocol/mqtt/MQTTRetainMessageManager.java | 98 +
.../artemis/core/protocol/mqtt/MQTTSession.java | 173 ++
.../core/protocol/mqtt/MQTTSessionCallback.java | 111 ++
.../core/protocol/mqtt/MQTTSessionState.java | 250 +++
.../protocol/mqtt/MQTTSubscriptionManager.java | 183 ++
.../artemis/core/protocol/mqtt/MQTTUtil.java | 178 ++
...mis.spi.core.protocol.ProtocolManagerFactory | 1 +
artemis-protocols/pom.xml | 1 +
pom.xml | 23 +
tests/integration-tests/pom.xml | 37 +
.../mqtt/imported/FuseMQTTClientProvider.java | 131 ++
.../mqtt/imported/MQTTClientProvider.java | 48 +
.../integration/mqtt/imported/MQTTTest.java | 1747 ++++++++++++++++++
.../mqtt/imported/MQTTTestSupport.java | 376 ++++
.../integration/mqtt/imported/PahoMQTTTest.java | 175 ++
.../util/ResourceLoadingSslContext.java | 284 +++
.../integration/mqtt/imported/util/Wait.java | 56 +
29 files changed, 5362 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-distribution/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index ebde324..d97f1a5 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -124,6 +124,11 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-mqtt-protocol</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
</dependency>
@@ -176,6 +181,10 @@
<version>${project.version}</version>
<classifier>javadoc</classifier>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-mqtt</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-distribution/src/main/assembly/dep.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index febc8ec..084d248 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -60,6 +60,7 @@
<include>org.apache.activemq:artemis-proton-plug</include>
<include>org.apache.activemq:artemis-hornetq-protocol</include>
<include>org.apache.activemq:artemis-stomp-protocol</include>
+ <include>org.apache.activemq:artemis-mqtt-protocol</include>
<include>org.apache.activemq:artemis-ra</include>
<include>org.apache.activemq:artemis-selector</include>
<include>org.apache.activemq:artemis-server</include>
@@ -86,6 +87,7 @@
<include>commons-collections:commons-collections</include>
<include>org.fusesource.hawtbuf:hawtbuf</include>
<include>org.jgroups:jgroups</include>
+ <include>io.netty:netty-codec-mqtt</include>
</includes>
<!--excludes>
<exclude>org.apache.activemq:artemis-website</exclude>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml
new file mode 100644
index 0000000..d566d88
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml
@@ -0,0 +1,56 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>artemis-protocols</artifactId>
+ <groupId>org.apache.activemq</groupId>
+ <version>1.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>artemis-mqtt-protocol</artifactId>
+
+ <properties>
+ <activemq.basedir>${project.basedir}/../..</activemq.basedir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging-processor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-mqtt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
new file mode 100644
index 0000000..08dd157
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+
+public class MQTTConnection implements RemotingConnection
+{
+ private final Connection transportConnection;
+
+ private final long creationTime;
+
+ private AtomicBoolean dataReceived;
+
+ private boolean destroyed;
+
+ private boolean connected;
+
+ private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>());
+
+ private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>());
+
+ public MQTTConnection(Connection transportConnection) throws Exception
+ {
+ this.transportConnection = transportConnection;
+ this.creationTime = System.currentTimeMillis();
+ this.dataReceived = new AtomicBoolean();
+ this.destroyed = false;
+ }
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ @Override
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ @Override
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
+ @Override
+ public void addFailureListener(FailureListener listener)
+ {
+ failureListeners.add(listener);
+ }
+
+ @Override
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return failureListeners.remove(listener);
+ }
+
+ @Override
+ public void addCloseListener(CloseListener listener)
+ {
+ closeListeners.add(listener);
+ }
+
+ @Override
+ public boolean removeCloseListener(CloseListener listener)
+ {
+ return closeListeners.remove(listener);
+ }
+
+ @Override
+ public List<CloseListener> removeCloseListeners()
+ {
+ synchronized (closeListeners)
+ {
+ List<CloseListener> deletedCloseListeners = new ArrayList<CloseListener>(closeListeners);
+ closeListeners.clear();
+ return deletedCloseListeners;
+ }
+ }
+
+ @Override
+ public void setCloseListeners(List<CloseListener> listeners)
+ {
+ closeListeners.addAll(listeners);
+ }
+
+ @Override
+ public List<FailureListener> getFailureListeners()
+ {
+ return failureListeners;
+ }
+
+ @Override
+ public List<FailureListener> removeFailureListeners()
+ {
+ synchronized (failureListeners)
+ {
+ List<FailureListener> deletedFailureListeners = new ArrayList<FailureListener>(failureListeners);
+ failureListeners.clear();
+ return deletedFailureListeners;
+ }
+ }
+
+ @Override
+ public void setFailureListeners(List<FailureListener> listeners)
+ {
+ synchronized (failureListeners)
+ {
+ failureListeners.clear();
+ failureListeners.addAll(listeners);
+ }
+ }
+
+ @Override
+ public ActiveMQBuffer createTransportBuffer(int size)
+ {
+ return transportConnection.createTransportBuffer(size);
+ }
+
+ @Override
+ public void fail(ActiveMQException me)
+ {
+ synchronized (failureListeners)
+ {
+ for (FailureListener listener : failureListeners)
+ {
+ listener.connectionFailed(me, false);
+ }
+ }
+ }
+
+ @Override
+ public void fail(ActiveMQException me, String scaleDownTargetNodeID)
+ {
+ synchronized (failureListeners)
+ {
+ for (FailureListener listener : failureListeners)
+ {
+ //FIXME(mtaylor) How do we check if the node has failed over?
+ listener.connectionFailed(me, false);
+ }
+ }
+ }
+
+ @Override
+ public void destroy()
+ {
+ //TODO(mtaylor) ensure this properly destroys this connection.
+ destroyed = true;
+ disconnect(false);
+ }
+
+ @Override
+ public Connection getTransportConnection()
+ {
+ return transportConnection;
+ }
+
+ @Override
+ public boolean isClient()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isDestroyed()
+ {
+ return destroyed;
+ }
+
+ @Override
+ public void disconnect(boolean criticalError)
+ {
+ transportConnection.forceClose();
+ }
+
+ @Override
+ public void disconnect(String scaleDownNodeID, boolean criticalError)
+ {
+ transportConnection.forceClose();
+ }
+
+ protected void dataReceived()
+ {
+ dataReceived.set(true);
+ }
+
+ @Override
+ public boolean checkDataReceived()
+ {
+ return dataReceived.compareAndSet(true, false);
+ }
+
+ @Override
+ public void flush()
+ {
+ transportConnection.checkFlushBatchBuffer();
+ }
+
+ @Override
+ public void bufferReceived(Object connectionID, ActiveMQBuffer buffer)
+ {
+ }
+
+ public void setConnected(boolean connected)
+ {
+ this.connected = connected;
+ }
+
+ public boolean getConnected()
+ {
+ return connected;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
new file mode 100644
index 0000000..e4433d2
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * MQTTConnectionMananager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
+ * events.
+ */
+public class MQTTConnectionManager
+{
+ private MQTTSession session;
+
+ //TODO Read in a list of existing client IDs from stored Sessions.
+ public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<String>();
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTConnectionManager(MQTTSession session)
+ {
+ this.session = session;
+ MQTTFailureListener failureListener = new MQTTFailureListener(this);
+ session.getConnection().addFailureListener(failureListener);
+ }
+
+ /**
+ * Handles the connect packet. See spec for details on each of parameters.
+ */
+ synchronized void connect(String cId, String username, String password, boolean will, String willMessage, String willTopic,
+ boolean willRetain, int willQosLevel, boolean cleanSession) throws Exception
+ {
+ String clientId = validateClientId(cId, cleanSession);
+ if (clientId == null)
+ {
+ session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+ session.getProtocolHandler().disconnect();
+ return;
+ }
+
+ session.setSessionState(getSessionState(clientId, cleanSession));
+
+ ServerSessionImpl serverSession = createServerSession(username, password);
+ serverSession.start();
+
+ session.setServerSession(serverSession);
+
+ if (will)
+ {
+ ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic, willQosLevel, willRetain);
+ session.getSessionState().setWillMessage(w);
+ }
+
+ session.getConnection().setConnected(true);
+ session.start();
+ session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
+ }
+
+ /**
+ * Creates an internal Server Session.
+ * @param username
+ * @param password
+ * @return
+ * @throws Exception
+ */
+ ServerSessionImpl createServerSession(String username, String password) throws Exception
+ {
+ String id = UUIDGenerator.getInstance().generateStringUUID();
+ ActiveMQServer server = session.getServer();
+
+ ServerSession serverSession = server.createSession(id,
+ username,
+ password,
+ ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ session.getConnection(),
+ MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
+ MQTTUtil.SESSION_AUTO_COMMIT_ACKS,
+ MQTTUtil.SESSION_PREACKNOWLEDGE,
+ MQTTUtil.SESSION_XA,
+ null,
+ session.getSessionCallback(),
+ null, // Session factory
+ MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
+ return (ServerSessionImpl) serverSession;
+ }
+
+ void disconnect()
+ {
+ try
+ {
+ if (session != null && session.getSessionState() != null)
+ {
+ String clientId = session.getSessionState().getClientId();
+ if (clientId != null) CONNECTED_CLIENTS.remove(clientId);
+
+ if (session.getState().isWill())
+ {
+ session.getConnectionManager().sendWill();
+ }
+ }
+ session.stop();
+ session.getConnection().disconnect(false);
+ session.getConnection().destroy();
+ }
+ catch (Exception e)
+ {
+ /* FIXME Failure during disconnect would leave the session state in an unrecoverable state. We should handle
+ errors more gracefully.
+ */
+ log.error("Error disconnecting client: " + e.getMessage());
+ }
+ }
+
+
+ private void sendWill() throws Exception
+ {
+ session.getServerSession().send(session.getSessionState().getWillMessage(), true);
+ session.getSessionState().deleteWillMessage();
+ }
+
+ private MQTTSessionState getSessionState(String clientId, boolean cleanSession) throws InterruptedException
+ {
+ synchronized (MQTTSession.SESSIONS)
+ {
+ /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
+ * start a new one This Session lasts as long as the Network Connection. State data associated with this Session
+ * MUST NOT be reused in any subsequent Session */
+ if (cleanSession)
+ {
+ MQTTSession.SESSIONS.remove(clientId);
+ return new MQTTSessionState(clientId);
+ }
+ else
+ {
+ /* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create
+ a new one. */
+ MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
+ if (state != null)
+ {
+ // TODO Add a count down latch for handling wait during attached session state.
+ while (state.getAttached())
+ {
+ Thread.sleep(1000);
+ }
+ return state;
+ }
+ else
+ {
+ state = new MQTTSessionState(clientId);
+ MQTTSession.SESSIONS.put(clientId, state);
+ return state;
+ }
+ }
+ }
+ }
+
+ private String validateClientId(String clientId, boolean cleanSession)
+ {
+ if (clientId == null || clientId.isEmpty())
+ {
+ // [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean session is set to 1 create it.
+ if (cleanSession)
+ {
+ clientId = UUID.randomUUID().toString();
+ }
+ else
+ {
+ // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
+ return null;
+ }
+ }
+ // If the client ID is not unique (i.e. it has already registered) then do not accept it.
+ else if (!CONNECTED_CLIENTS.add(clientId))
+ {
+ // [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID
+ return null;
+ }
+ return clientId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java
new file mode 100644
index 0000000..b33bc5e
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+
+/**
+ * Registered with the server and called during connection failure. This class informs the ConnectionManager when a
+ * connection failure has occurred, which subsequently cleans up any connection data.
+ */
+public class MQTTFailureListener implements FailureListener
+{
+ private MQTTConnectionManager connectionManager;
+
+ public MQTTFailureListener(MQTTConnectionManager connectionManager)
+ {
+ this.connectionManager = connectionManager;
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver)
+ {
+ connectionManager.disconnect();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID)
+ {
+ connectionManager.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
new file mode 100644
index 0000000..ab3b221
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * Logger Code 83
+ *
+ * each message id must be 6 digits long starting with 10, the 3rd digit donates the level so
+ *
+ * INF0 1
+ * WARN 2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 101000 to 101999
+ */
+
+@MessageLogger(projectCode = "AMQ")
+public interface MQTTLogger extends BasicLogger
+{
+ MQTTLogger LOGGER = Logger.getMessageLogger(MQTTLogger.class, MQTTLogger.class.getPackage().getName());
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java
new file mode 100644
index 0000000..e20119d
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+/**
+ * MQTT Acks only hold message ID information. From this we must infer the internal message ID and consumer.
+ */
+class MQTTMessageInfo
+{
+ private long serverMessageId;
+
+ private long consumerId;
+
+ private String address;
+
+ MQTTMessageInfo(long serverMessageId, long consumerId, String address)
+ {
+ this.serverMessageId = serverMessageId;
+ this.consumerId = consumerId;
+ this.address = address;
+ }
+
+ long getServerMessageId()
+ {
+ return serverMessageId;
+ }
+
+ long getConsumerId()
+ {
+ return consumerId;
+ }
+
+ String getAddress()
+ {
+ return address;
+ }
+
+ public String toString()
+ {
+ return ("ServerMessageId: " + serverMessageId + " ConsumerId: " + consumerId + " addr: " + address);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
new file mode 100644
index 0000000..37610f8
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+
+/**
+ * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
+ * MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes.
+ */
+public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
+{
+ private ConnectionEntry connectionEntry;
+
+ private MQTTConnection connection;
+
+ private MQTTSession session;
+
+ private ActiveMQServer server;
+
+ // This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
+ private ChannelHandlerContext ctx;
+
+ private final MQTTLogger log = MQTTLogger.LOGGER;;
+
+ private boolean stopped = false;
+
+ public MQTTProtocolHandler(ActiveMQServer server)
+ {
+ this.server = server;
+ }
+
+ void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception
+ {
+ this.connectionEntry = entry;
+ this.connection = connection;
+ this.session = new MQTTSession(this, connection);
+ }
+
+
+ void stop(boolean error)
+ {
+ stopped = true;
+ }
+
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ {
+ try
+ {
+ if (stopped)
+ {
+ disconnect();
+ return;
+ }
+
+ MqttMessage message = (MqttMessage) msg;
+
+ // Disconnect if Netty codec failed to decode the stream.
+ if (message.decoderResult().isFailure())
+ {
+ log.debug("Bad Message Disconnecting Client.");
+ disconnect();
+ return;
+ }
+
+ connection.dataReceived();
+
+ MQTTUtil.logMessage(log, message, true);
+
+ switch (message.fixedHeader().messageType())
+ {
+ case CONNECT:
+ handleConnect((MqttConnectMessage) message, ctx);
+ break;
+ case CONNACK:
+ handleConnack((MqttConnAckMessage) message);
+ break;
+ case PUBLISH:
+ handlePublish((MqttPublishMessage) message);
+ break;
+ case PUBACK:
+ handlePuback((MqttPubAckMessage) message);
+ break;
+ case PUBREC:
+ handlePubrec(message);
+ break;
+ case PUBREL:
+ handlePubrel(message);
+ break;
+ case PUBCOMP:
+ handlePubcomp(message);
+ break;
+ case SUBSCRIBE:
+ handleSubscribe((MqttSubscribeMessage) message, ctx);
+ break;
+ case SUBACK:
+ handleSuback((MqttSubAckMessage) message);
+ break;
+ case UNSUBSCRIBE:
+ handleUnsubscribe((MqttUnsubscribeMessage) message);
+ break;
+ case UNSUBACK:
+ handleUnsuback((MqttUnsubAckMessage) message);
+ break;
+ case PINGREQ:
+ handlePingreq(message, ctx);
+ break;
+ case PINGRESP:
+ handlePingresp(message);
+ break;
+ case DISCONNECT:
+ handleDisconnect(message);
+ break;
+ default:
+ disconnect();
+ }
+ }
+ catch (Exception e)
+ {
+ log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage());
+ disconnect();
+ }
+ }
+
+ /**
+ * Called during connection.
+ *
+ * @param connect
+ */
+ void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception
+ {
+ this.ctx = ctx;
+ connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 750;
+
+ String clientId = connect.payload().clientIdentifier();
+ session.getConnectionManager().connect(clientId,
+ connect.payload().userName(),
+ connect.payload().password(),
+ connect.variableHeader().isWillFlag(),
+ connect.payload().willMessage(),
+ connect.payload().willTopic(),
+ connect.variableHeader().isWillRetain(),
+ connect.variableHeader().willQos(),
+ connect.variableHeader().isCleanSession());
+ }
+
+ void disconnect()
+ {
+ session.getConnectionManager().disconnect();
+ }
+
+ void sendConnack(MqttConnectReturnCode returnCode)
+ {
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,
+ false,
+ MqttQoS.AT_MOST_ONCE,
+ false,
+ 0);
+ MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode);
+ MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
+
+ ctx.write(message);
+ ctx.flush();
+ }
+
+ /**
+ * The server does not instantiate connections therefore any CONNACK received over a connection is an invalid
+ * control message.
+ * @param message
+ */
+ void handleConnack(MqttConnAckMessage message)
+ {
+ log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId());
+ log.debug("Disconnecting client: " + session.getSessionState().getClientId());
+ disconnect();
+ }
+
+ void handlePublish(MqttPublishMessage message) throws Exception
+ {
+ session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(),
+ message.variableHeader().topicName(),
+ message.fixedHeader().qosLevel().value(),
+ message.payload(),
+ message.fixedHeader().isRetain());
+ }
+
+ void sendPubAck(int messageId)
+ {
+ sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBACK);
+ }
+
+ void sendPubRel(int messageId)
+ {
+ sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREL);
+ }
+
+ void sendPubRec(int messageId)
+ {
+ sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREC);
+ }
+
+ void sendPubComp(int messageId)
+ {
+ sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBCOMP);
+ }
+
+ void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType)
+ {
+ MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
+ MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType,
+ false,
+ qos, // Spec requires 01 in header for rel
+ false,
+ 0);
+ MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
+ ctx.write(rel);
+ ctx.flush();
+ }
+
+ void handlePuback(MqttPubAckMessage message) throws Exception
+ {
+ session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
+ }
+
+ void handlePubrec(MqttMessage message) throws Exception
+ {
+ int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
+ session.getMqttPublishManager().handlePubRec(messageId);
+ }
+
+ void handlePubrel(MqttMessage message)
+ {
+ int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
+ session.getMqttPublishManager().handlePubRel(messageId);
+ }
+
+ void handlePubcomp( MqttMessage message) throws Exception
+ {
+ int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
+ session.getMqttPublishManager().handlePubComp(messageId);
+ }
+
+ void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception
+ {
+ MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
+ int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
+
+ MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK,
+ false,
+ MqttQoS.AT_MOST_ONCE,
+ false,
+ 0);
+ MqttSubAckMessage ack = new MqttSubAckMessage(header,
+ message.variableHeader(),
+ new MqttSubAckPayload(qos));
+ ctx.write(ack);
+ ctx.flush();
+ }
+
+ void handleSuback(MqttSubAckMessage message)
+ {
+ disconnect();
+ }
+
+ void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception
+ {
+ session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
+ MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK,
+ false,
+ MqttQoS.AT_MOST_ONCE,
+ false,
+ 0);
+ MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
+ ctx.write(m);
+ ctx.flush();
+ }
+
+ void handleUnsuback(MqttUnsubAckMessage message)
+ {
+ disconnect();
+ }
+
+ void handlePingreq(MqttMessage message, ChannelHandlerContext ctx)
+ {
+ ctx.write(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP,
+ false,
+ MqttQoS.AT_MOST_ONCE,
+ false,
+ 0)));
+ ctx.flush();
+ }
+
+ void handlePingresp(MqttMessage message)
+ {
+ disconnect();
+ }
+
+ void handleDisconnect(MqttMessage message)
+ {
+ if (session.getSessionState() != null) session.getState().deleteWillMessage();
+ disconnect();
+ }
+
+
+ protected int send(int messageId, String topicName, int qosLevel, ByteBuf payload, int deliveryCount)
+ {
+ boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
+ MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH,
+ redelivery,
+ MqttQoS.valueOf(qosLevel),
+ false,
+ 0);
+ MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
+ MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+
+ ctx.write(publish);
+ ctx.flush();
+
+ return 1;
+ }
+
+ ActiveMQServer getServer()
+ {
+ return server;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
new file mode 100644
index 0000000..b92a09f
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttEncoder;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+
+import java.util.List;
+
+/**
+ * MQTTProtocolManager
+ */
+class MQTTProtocolManager implements ProtocolManager, NotificationListener
+{
+ private ActiveMQServer server;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTProtocolManager(ActiveMQServer server)
+ {
+ this.server = server;
+ }
+
+ @Override
+ public void onNotification(Notification notification)
+ {
+ // TODO handle notifications
+ }
+
+
+ @Override
+ public ProtocolManagerFactory getFactory()
+ {
+ return new MQTTProtocolManagerFactory();
+ }
+
+ @Override
+ public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors)
+ {
+ // TODO handle interceptors
+ }
+
+ @Override
+ public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection)
+ {
+ try
+ {
+ MQTTConnection mqttConnection = new MQTTConnection(connection);
+ ConnectionEntry entry = new ConnectionEntry(mqttConnection,
+ null,
+ System.currentTimeMillis(),
+ MQTTUtil.DEFAULT_KEEP_ALIVE_FREQUENCY);
+
+ NettyServerConnection nettyConnection = ((NettyServerConnection) connection);
+ MQTTProtocolHandler protocolHandler = nettyConnection.getChannel().pipeline().get(MQTTProtocolHandler.class);
+ protocolHandler.setConnection(mqttConnection, entry);
+ return entry;
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ return null;
+ }
+ }
+
+ @Override
+ public void removeHandler(String name)
+ {
+ // TODO add support for handlers
+ }
+
+ @Override
+ public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer)
+ {
+ connection.bufferReceived(connection.getID(), buffer);
+ }
+
+ @Override
+ public void addChannelHandlers(ChannelPipeline pipeline)
+ {
+ pipeline.addLast(new MqttEncoder());
+ pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
+
+ pipeline.addLast(new MQTTProtocolHandler(server));
+ }
+
+ @Override
+ public boolean isProtocol(byte[] array)
+ {
+ boolean mqtt311 = array[4] == 77 && // M
+ array[5] == 81 && // Q
+ array[6] == 84 && // T
+ array[7] == 84; // T
+
+ // FIXME The actual protocol name is 'MQIsdp' (However we are only passed the first 4 bytes of the protocol name)
+ boolean mqtt31 = array[4] == 77 && // M
+ array[5] == 81 && // Q
+ array[6] == 73 && // I
+ array[7] == 115; // s
+ return mqtt311 || mqtt31;
+ }
+
+ @Override
+ public MessageConverter getConverter()
+ {
+ return null;
+ }
+
+ @Override
+ public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
new file mode 100644
index 0000000..7194d02
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import java.util.List;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+
+public class MQTTProtocolManagerFactory implements ProtocolManagerFactory
+{
+ public static final String MQTT_PROTOCOL_NAME = "MQTT";
+
+ private static final String MODULE_NAME = "artemis-mqtt-protocol";
+
+ private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
+
+ @Override
+ public ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors)
+ {
+ return new MQTTProtocolManager(server);
+ }
+
+ @Override
+ public List filterInterceptors(List list)
+ {
+ // TODO Add support for interceptors.
+ return null;
+ }
+
+ @Override
+ public String[] getProtocols()
+ {
+ return SUPPORTED_PROTOCOLS;
+ }
+
+ @Override
+ public String getModuleName()
+ {
+ return MODULE_NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
new file mode 100644
index 0000000..aa3f9e0
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.EmptyByteBuf;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.journal.IOAsyncTask;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+
+/**
+ * Handles MQTT Exactly Once (QoS level 2) Protocol.
+ */
+public class MQTTPublishManager
+{
+ private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
+
+ private SimpleString managementAddress;
+
+ private ServerConsumer managementConsumer;
+
+ private MQTTSession session;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ private final Object lock = new Object();
+
+ public MQTTPublishManager(MQTTSession session)
+ {
+ this.session = session;
+ }
+
+ synchronized void start() throws Exception
+ {
+ createManagementAddress();
+ createManagementQueue();
+ createManagementConsumer();
+ }
+
+ synchronized void stop(boolean clean) throws Exception
+ {
+ if (managementConsumer != null)
+ {
+ managementConsumer.removeItself();
+ managementConsumer.setStarted(false);
+ managementConsumer.close(false);
+ if (clean) session.getServer().destroyQueue(managementAddress);
+ }
+ }
+
+ private void createManagementConsumer() throws Exception
+ {
+ long consumerId = session.getServer().getStorageManager().generateID();
+ managementConsumer = session.getServerSession().createConsumer(consumerId, managementAddress, null, false, false, -1);
+ managementConsumer.setStarted(true);
+ }
+
+ private void createManagementAddress()
+ {
+ String clientId = session.getSessionState().getClientId();
+ managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId);
+ }
+
+ private void createManagementQueue() throws Exception
+ {
+ if (session.getServer().locateQueue(managementAddress) == null)
+ {
+ session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
+ }
+ }
+
+ boolean isManagementConsumer(ServerConsumer consumer)
+ {
+ return consumer == managementConsumer;
+ }
+
+ private int generateMqttId(int qos)
+ {
+ if (qos == 1)
+ {
+ return session.getSessionState().generateId();
+ }
+ else
+ {
+ Integer mqttid = session.getSessionState().generateId();
+ if (mqttid == null)
+ {
+ mqttid = (int) session.getServer().getStorageManager().generateID();
+ }
+ return mqttid;
+ }
+ }
+
+ /** Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
+ * returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
+ * are not able to decide which consumer to ack. Instead we send MQTT messages with different IDs and store a reference
+ * to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from
+ * the PubAck or PubRec message id. **/
+ protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception
+ {
+ // This is to allow retries of PubRel.
+ if (isManagementConsumer(consumer))
+ {
+ sendPubRelMessage(message);
+ }
+ else
+ {
+ int qos = decideQoS(message, consumer);
+ if (qos == 0)
+ {
+ sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
+ session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
+ }
+ else
+ {
+ String consumerAddress = consumer.getQueue().getAddress().toString();
+ Integer mqttid = generateMqttId(qos);
+
+ session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos);
+ sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+ }
+ }
+ }
+
+ // INBOUND
+ void handleMessage(int messageId, String topic, int qos, ByteBuf payload, boolean retain) throws Exception
+ {
+ synchronized (lock)
+ {
+ ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
+
+ if (qos > 0)
+ {
+ serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
+ }
+
+ if (qos < 2 || !session.getSessionState().getPubRec().contains(messageId))
+ {
+ if (qos == 2) session.getSessionState().getPubRec().add(messageId);
+ session.getServerSession().send(serverMessage, true);
+ }
+
+
+ if (retain)
+ {
+ boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
+ session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset);
+ }
+
+ createMessageAck(messageId, qos);
+ }
+ }
+
+ void sendPubRelMessage(ServerMessage message)
+ {
+ if (message.getIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY) == MqttMessageType.PUBREL.value())
+ {
+ int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
+ MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), managementConsumer.getID(), message.getAddress().toString());
+ session.getSessionState().storeMessageRef(messageId, messageInfo, false);
+ session.getProtocolHandler().sendPubRel(messageId);
+ }
+ }
+
+ private void createMessageAck(final int messageId, final int qos)
+ {
+ session.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask()
+ {
+ @Override
+ public void done()
+ {
+ if (qos == 1)
+ {
+ session.getProtocolHandler().sendPubAck(messageId);
+ }
+ else if (qos == 2)
+ {
+ session.getProtocolHandler().sendPubRec(messageId);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage)
+ {
+ log.error("Pub Sync Failed");
+ }
+ });
+ }
+
+ void handlePubRec(int messageId) throws Exception
+ {
+ MQTTMessageInfo messageRef = session.getSessionState().getMessageInfo(messageId);
+ if (messageRef != null)
+ {
+ ServerMessage pubRel = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+ session.getServerSession().send(pubRel, true);
+ session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId());
+ session.getProtocolHandler().sendPubRel(messageId);
+ }
+ }
+
+ void handlePubComp(int messageId) throws Exception
+ {
+ MQTTMessageInfo messageInfo = session.getSessionState().getMessageInfo(messageId);
+
+ // Check to see if this message is stored if not just drop the packet.
+ if (messageInfo != null)
+ {
+ session.getServerSession().acknowledge(managementConsumer.getID(), messageInfo.getServerMessageId());
+ }
+ }
+
+ void handlePubRel(int messageId)
+ {
+ // We don't check to see if a PubRel existed for this message. We assume it did and so send PubComp.
+ session.getSessionState().getPubRec().remove(messageId);
+ session.getProtocolHandler().sendPubComp(messageId);
+ session.getSessionState().removeMessageRef(messageId);
+ }
+
+
+ void handlePubAck(int messageId) throws Exception
+ {
+ Pair<String, Long> pub1MessageInfo = session.getSessionState().removeOutbandMessageRef(messageId, 1);
+ if (pub1MessageInfo != null)
+ {
+ String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT(pub1MessageInfo.getA());
+ ServerConsumer consumer = session.getSubscriptionManager().getConsumerForAddress(mqttAddress);
+ session.getServerSession().acknowledge(consumer.getID(), pub1MessageInfo.getB());
+ }
+ }
+
+ private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos)
+ {
+ String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
+
+ //FIXME should we be copying the body buffer here?
+ ByteBuf payload = message.getBodyBufferCopy().byteBuf();
+ session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
+ }
+
+ private int decideQoS(ServerMessage message, ServerConsumer consumer)
+ {
+ int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+ int qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
+
+ /* Subscription QoS is the maximum QoS the client is willing to receive for this subscription. If the message QoS
+ is less than the subscription QoS then use it, otherwise use the subscription qos). */
+ return subscriptionQoS < qos ? subscriptionQoS : qos;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
new file mode 100644
index 0000000..c1fd17f
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import java.util.Iterator;
+
+
+public class MQTTRetainMessageManager
+{
+ private MQTTSession session;
+
+ public MQTTRetainMessageManager(MQTTSession session)
+ {
+ this.session = session;
+ }
+
+ /** FIXME
+ * Retained messages should be handled in the core API. There is currently no support for retained messages
+ * at the time of writing. Instead we handle retained messages here. This method will create a new queue for
+ * every address that is used to store retained messages. THere should only ever be one message in the retained
+ * message queue. When a new subscription is created the queue should be browsed and the message copied onto
+ * the subscription queue for the consumer. When a new retained message is received the message will be sent to
+ * the retained queue and the previous retain message consumed to remove it from the queue. */
+ void handleRetainedMessage(ServerMessage message, String address, boolean reset) throws Exception
+ {
+ SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
+
+ if (!session.getServerSession().executeQueueQuery(retainAddress).isExists())
+ {
+ session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
+ }
+ Queue queue = session.getServer().locateQueue(retainAddress);
+
+ // Set the address of this message to the retained queue.
+ message.setAddress(retainAddress);
+
+ Iterator<MessageReference> iterator = queue.iterator();
+ synchronized (iterator)
+ {
+ if (iterator.hasNext())
+ {
+ Long messageId = iterator.next().getMessage().getMessageID();
+ queue.deleteReference(messageId);
+ }
+
+ if (!reset)
+ {
+ session.getServerSession().send(message.copy(), true);
+ }
+ }
+ }
+
+ void addRetainedMessagesToQueue(SimpleString queueName, String address) throws Exception
+ {
+ // Queue to add the retained messages to
+ Queue queue = session.getServer().locateQueue(queueName);
+
+ // The address filter that matches all retained message queues.
+ String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
+ BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
+
+ // Iterate over all matching retain queues and add the head message to the original queue.
+ for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames())
+ {
+ Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
+ synchronized (this)
+ {
+ Iterator<MessageReference> i = retainedQueue.iterator();
+ if (i.hasNext())
+ {
+ ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
+ queue.addTail(message.createReference(queue), true);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
new file mode 100644
index 0000000..e3516f1
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class MQTTSession
+{
+ static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>();
+
+ private final String id = UUID.randomUUID().toString();
+
+ private MQTTProtocolHandler protocolHandler;
+
+ private MQTTSubscriptionManager subscriptionManager;
+
+ private MQTTSessionCallback sessionCallback;
+
+ private ServerSessionImpl serverSession;
+
+ private MQTTPublishManager mqttPublishManager;
+
+ private MQTTConnectionManager mqttConnectionManager;
+
+ private MQTTRetainMessageManager retainMessageManager;
+
+ private MQTTConnection connection;
+
+ protected MQTTSessionState state;
+
+ private boolean stopped = false;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTSession( MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws Exception
+ {
+ this.protocolHandler = protocolHandler;
+ this.connection = connection;
+
+ mqttConnectionManager = new MQTTConnectionManager(this);
+ mqttPublishManager = new MQTTPublishManager(this);
+ sessionCallback = new MQTTSessionCallback(this);
+ subscriptionManager = new MQTTSubscriptionManager(this);
+ retainMessageManager = new MQTTRetainMessageManager(this);
+
+ log.debug("SESSION CREATED: " + id);
+ }
+
+ // Called after the client has Connected.
+ synchronized void start() throws Exception
+ {
+ mqttPublishManager.start();
+ subscriptionManager.start();
+ stopped = false;
+ }
+
+ // TODO ensure resources are cleaned up for GC.
+ synchronized void stop() throws Exception
+ {
+ if (!stopped)
+ {
+ protocolHandler.stop(false);
+ // TODO this should pass in clean session.
+ subscriptionManager.stop(false);
+ mqttPublishManager.stop(false);
+
+ if (serverSession != null)
+ {
+ serverSession.stop();
+ serverSession.close(false);
+ }
+
+ if (state != null)
+ {
+ state.setAttached(false);
+ }
+ }
+ stopped = true;
+ }
+
+ boolean getStopped()
+ {
+ return stopped;
+ }
+
+ MQTTPublishManager getMqttPublishManager()
+ {
+ return mqttPublishManager;
+ }
+
+ MQTTSessionState getState()
+ {
+ return state;
+ }
+
+ MQTTConnectionManager getConnectionManager()
+ {
+ return mqttConnectionManager;
+ }
+
+ MQTTSessionState getSessionState()
+ {
+ return state;
+ }
+
+ ServerSessionImpl getServerSession()
+ {
+ return serverSession;
+ }
+
+ ActiveMQServer getServer()
+ {
+ return protocolHandler.getServer();
+ }
+
+ MQTTSubscriptionManager getSubscriptionManager()
+ {
+ return subscriptionManager;
+ }
+
+ MQTTProtocolHandler getProtocolHandler()
+ {
+ return protocolHandler;
+ }
+
+ SessionCallback getSessionCallback()
+ {
+ return sessionCallback;
+ }
+
+ void setServerSession(ServerSessionImpl serverSession)
+ {
+ this.serverSession = serverSession;
+ }
+
+ void setSessionState(MQTTSessionState state)
+ {
+ this.state = state;
+ state.setAttached(true);
+ }
+
+ MQTTRetainMessageManager getRetainMessageManager()
+ {
+ return retainMessageManager;
+ }
+
+ MQTTConnection getConnection()
+ {
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
new file mode 100644
index 0000000..63e19a5
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class MQTTSessionCallback implements SessionCallback
+{
+ private MQTTSession session;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ public MQTTSessionCallback(MQTTSession session) throws Exception
+ {
+ this.session = session;
+ }
+
+ @Override
+ public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount)
+ {
+ try
+ {
+ session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+ }
+ return 1;
+ }
+
+ @Override
+ public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ log.warn("Sending LARGE MESSAGE");
+ return 1;
+ }
+
+ @Override
+ public void addReadyListener(ReadyListener listener)
+ {
+ session.getConnection().getTransportConnection().addReadyListener(listener);
+ }
+
+ @Override
+ public void removeReadyListener(ReadyListener listener)
+ {
+ session.getConnection().getTransportConnection().removeReadyListener(listener);
+ }
+
+ @Override
+ public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount)
+ {
+ return sendMessage(message, consumer, deliveryCount);
+ }
+
+ @Override
+ public void disconnect(ServerConsumer consumer, String queueName)
+ {
+ try
+ {
+ consumer.removeItself();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean hasCredits(ServerConsumer consumerID)
+ {
+ return true;
+ }
+
+ @Override
+ public void sendProducerCreditsMessage(int credits, SimpleString address)
+ {
+ }
+
+ @Override
+ public void sendProducerCreditsFailMessage(int credits, SimpleString address)
+ {
+ }
+
+ @Override
+ public void closed()
+ {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
new file mode 100644
index 0000000..b7fa436
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MQTTSessionState
+{
+ private String clientId;
+
+ private ServerMessage willMessage;
+
+ private final ConcurrentHashMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
+
+ // Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B.
+ private Map<Integer, MQTTMessageInfo> messageRefStore;
+
+ private Map<String, Map<Long, Integer>> addressMessageMap;
+
+ private Set<Integer> pubRec;
+
+ private Set<Integer> pub;
+
+ private boolean attached = false;
+
+ private MQTTLogger log = MQTTLogger.LOGGER;
+
+ // Objects track the Outbound message references
+ private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
+
+ private ConcurrentHashMap<String, ConcurrentHashMap<Long, Integer>> reverseOutboundReferenceStore;
+
+ private final Object outboundLock = new Object();
+
+ // FIXME We should use a better mechanism for creating packet IDs.
+ private AtomicInteger lastId = new AtomicInteger(0);
+
+ public MQTTSessionState(String clientId)
+ {
+ this.clientId = clientId;
+
+ pubRec = new HashSet<>();
+ pub = new HashSet<>();
+
+ outboundMessageReferenceStore = new ConcurrentHashMap<>();
+ reverseOutboundReferenceStore = new ConcurrentHashMap<>();
+
+ messageRefStore = new ConcurrentHashMap<>();
+ addressMessageMap = new ConcurrentHashMap<>();
+ }
+
+ int generateId()
+ {
+ lastId.compareAndSet(Short.MAX_VALUE, 1);
+ return lastId.addAndGet(1);
+ }
+
+ void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos)
+ {
+ synchronized (outboundLock)
+ {
+ outboundMessageReferenceStore.put(mqttId, new Pair<String, Long>(address, serverMessageId));
+ if (qos == 2)
+ {
+ if (reverseOutboundReferenceStore.containsKey(address))
+ {
+ reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId);
+ }
+ else
+ {
+ ConcurrentHashMap<Long, Integer> serverToMqttId = new ConcurrentHashMap<Long, Integer>();
+ serverToMqttId.put(serverMessageId, mqttId);
+ reverseOutboundReferenceStore.put(address, serverToMqttId);
+ }
+ }
+ }
+ }
+
+ Pair<String, Long> removeOutbandMessageRef(int mqttId, int qos)
+ {
+ synchronized (outboundLock)
+ {
+ Pair<String, Long> messageInfo = outboundMessageReferenceStore.remove(mqttId);
+ if (qos == 1)
+ {
+ return messageInfo;
+ }
+
+ Map<Long, Integer> map = reverseOutboundReferenceStore.get(messageInfo.getA());
+ if (map != null)
+ {
+ map.remove(messageInfo.getB());
+ if (map.isEmpty())
+ {
+ reverseOutboundReferenceStore.remove(messageInfo.getA());
+ }
+ return messageInfo;
+ }
+ return null;
+ }
+ }
+
+ Set<Integer> getPubRec()
+ {
+ return pubRec;
+ }
+
+ Set<Integer> getPub()
+ {
+ return pub;
+ }
+
+ boolean getAttached()
+ {
+ return attached;
+ }
+
+ void setAttached(boolean attached)
+ {
+ this.attached = attached;
+ }
+
+ boolean isWill()
+ {
+ return willMessage != null;
+ }
+
+ ServerMessage getWillMessage()
+ {
+ return willMessage;
+ }
+
+ void setWillMessage(ServerMessage willMessage)
+ {
+ this.willMessage = willMessage;
+ }
+
+ void deleteWillMessage()
+ {
+ willMessage = null;
+ }
+
+ Collection<MqttTopicSubscription> getSubscriptions()
+ {
+ return subscriptions.values();
+ }
+
+ boolean addSubscription(MqttTopicSubscription subscription)
+ {
+ synchronized (subscriptions)
+ {
+ addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap<Long, Integer>());
+
+ MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
+ if (existingSubscription != null)
+ {
+ if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value())
+ {
+ subscriptions.put(subscription.topicName(), subscription);
+ return true;
+ }
+ }
+ else
+ {
+ subscriptions.put(subscription.topicName(), subscription);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void removeSubscription(String address)
+ {
+ synchronized (subscriptions)
+ {
+ subscriptions.remove(address);
+ addressMessageMap.remove(address);
+ }
+ }
+
+ MqttTopicSubscription getSubscription(String address)
+ {
+ return subscriptions.get(address);
+ }
+
+ String getClientId()
+ {
+ return clientId;
+ }
+
+ void setClientId(String clientId)
+ {
+ this.clientId = clientId;
+ }
+
+ void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress)
+ {
+ messageRefStore.put(mqttId, messageInfo);
+ if (storeAddress)
+ {
+ Map<Long, Integer> addressMap = addressMessageMap.get(messageInfo.getAddress());
+ if (addressMap != null)
+ {
+ addressMap.put(messageInfo.getServerMessageId(), mqttId);
+ }
+ }
+ }
+
+ void removeMessageRef(Integer mqttId)
+ {
+ MQTTMessageInfo info = messageRefStore.remove(mqttId);
+ if (info != null)
+ {
+ Map<Long, Integer> addressMap = addressMessageMap.get(info.getAddress());
+ if (addressMap != null)
+ {
+ addressMap.remove(info.getServerMessageId());
+ }
+ }
+ }
+
+ MQTTMessageInfo getMessageInfo(Integer mqttId)
+ {
+ return messageRefStore.get(mqttId);
+ }
+}