You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/25 20:53:04 UTC
svn commit: r1390048 [1/2] - in /activemq/activemq-apollo/trunk: ./
apollo-cli/ apollo-distro/ apollo-distro/src/main/descriptors/
apollo-distro/src/main/release/examples/mqtt/java/ apollo-mqtt/
apollo-mqtt/src/ apollo-mqtt/src/main/ apollo-mqtt/src/ma...
Author: chirino
Date: Tue Sep 25 18:53:01 2012
New Revision: 1390048
URL: http://svn.apache.org/viewvc?rev=1390048&view=rev
Log:
Fixes APLO-260 : Contribute the FuseSource MQTT impl to Apache and included as part of the default Apollo distribution.
Added:
activemq/activemq-apollo/trunk/apollo-mqtt/
activemq/activemq-apollo/trunk/apollo-mqtt/pom.xml
activemq/activemq-apollo/trunk/apollo-mqtt/readme.md
activemq/activemq-apollo/trunk/apollo-mqtt/src/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/proto/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/dto/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttDTO.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/package-info.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/dto/
activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/ide-resources/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/ide-resources/log4j.properties (with props)
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-bdb.xml
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt-leveldb.xml
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo-mqtt.xml
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/apollo.ks (with props)
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/client.ks (with props)
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/log4j.properties
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/login.config
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/apache/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/apache/activemq/apollo/mqtt/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/apache/activemq/apollo/mqtt/dto/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/org/apache/activemq/apollo/mqtt/dto/simple.xml
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/resources/users.properties
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/dto/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/dto/XmlCodecTest.java
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttCleanSessionTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttConnectionTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttExistingSessionOnBDBTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttExistingSessionOnLevelDBTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttExistingSessionTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttQosTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttStompInteropTest.scala
activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttTestSupport.scala
Modified:
activemq/activemq-apollo/trunk/apollo-cli/pom.xml
activemq/activemq-apollo/trunk/apollo-distro/pom.xml
activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml
activemq/activemq-apollo/trunk/apollo-distro/src/main/release/examples/mqtt/java/readme.md
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-cli/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/pom.xml?rev=1390048&r1=1390047&r2=1390048&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-cli/pom.xml Tue Sep 25 18:53:01 2012
@@ -141,6 +141,12 @@
<version>99-trunk-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-mqtt</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
Modified: activemq/activemq-apollo/trunk/apollo-distro/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-distro/pom.xml?rev=1390048&r1=1390047&r2=1390048&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-distro/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-distro/pom.xml Tue Sep 25 18:53:01 2012
@@ -73,6 +73,11 @@
<artifactId>apollo-openwire</artifactId>
<version>99-trunk-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-mqtt</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
Modified: activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml?rev=1390048&r1=1390047&r2=1390048&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-distro/src/main/descriptors/common-bin.xml Tue Sep 25 18:53:01 2012
@@ -40,6 +40,9 @@
<include>org.fusesource.leveldbjni:*</include>
<include>org.fusesource.hawtjni:*</include>
+ <!-- for mqtt support -->
+ <include>org.fusesource.mqtt-client:*</include>
+
<!-- for XPATH selector support -->
<include>xalan:xalan</include>
Modified: activemq/activemq-apollo/trunk/apollo-distro/src/main/release/examples/mqtt/java/readme.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-distro/src/main/release/examples/mqtt/java/readme.md?rev=1390048&r1=1390047&r2=1390048&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-distro/src/main/release/examples/mqtt/java/readme.md (original)
+++ activemq/activemq-apollo/trunk/apollo-distro/src/main/release/examples/mqtt/java/readme.md Tue Sep 25 18:53:01 2012
@@ -6,7 +6,6 @@ This is an example of how use the MQTT p
- Install Java SDK
- Install [Maven](http://maven.apache.org/download.html)
-- Install the [MQTT protocol for Apollo](https://github.com/fusesource/fuse-extra/tree/master/fusemq-apollo/fusemq-apollo-mqtt)
## Building
Added: activemq/activemq-apollo/trunk/apollo-mqtt/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/pom.xml?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/pom.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/pom.xml Tue Sep 25 18:53:01 2012
@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-scala</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <relativePath>../apollo-scala</relativePath>
+ </parent>
+
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-mqtt</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>${project.artifactId}</name>
+ <description>MQTT messaging protocol</description>
+
+ <properties>
+ <!--
+ <karaf-version>2.2.2</karaf-version>
+ <jetty-version>7.5.1.v20110908</jetty-version>
+ <scalate-version>1.5.3</scalate-version>
+ -->
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-broker</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>${mqtt-client-version}</version>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-broker</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-util</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-bdb</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-leveldb</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-stomp</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apollo-web</artifactId>
+ <version>99-trunk-SNAPSHOT</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all-server</artifactId>
+ <version>${jetty-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.fusesource.stompjms</groupId>
+ <artifactId>stompjms-client</artifactId>
+ <version>${stompjms-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>${junit-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.9.1</artifactId>
+ <version>${scalatest-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Generate a test jar for the test cases in this package -->
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.fusesource.scalate</groupId>
+ <artifactId>maven-scalate-plugin</artifactId>
+ <version>${scalate-version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>precompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.fusesource.hawtbuf</groupId>
+ <artifactId>hawtbuf-protoc</artifactId>
+ <version>${hawtbuf-version}</version>
+ <configuration>
+ <type>alt</type>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
Added: activemq/activemq-apollo/trunk/apollo-mqtt/readme.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/readme.md?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/readme.md (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/readme.md Tue Sep 25 18:53:01 2012
@@ -0,0 +1,30 @@
+# The MQTT Protocol for Apollo
+
+## Overview
+
+This module adds MQTT v3.1 protocol support to Apache Apollo message brokers.
+All MQTT v3.1 feature are supported:
+
+* QoS 0, 1, and 2
+* Retained messages
+* Clean and non-clean sessions
+* Client authentication
+
+## Validating the Installation
+
+You can use the simple MQTT listener and publisher command line apps included
+in the mqtt-client library. To use, download the
+[mqtt-client-1.2-uber.jar][client_release_jar] then in a command line
+window, run a MQTT message listener on the `test` topic on your local apollo broker
+by running:
+
+ java -cp mqtt-client-1.2-uber.jar org.fusesource.mqtt.cli.Listener -h tcp://localhost:61613 -u admin -p password -t test
+
+Then in a seperate command line window then run a publisher to send a `hello` message
+to the `test` topic by running:
+
+ java -cp mqtt-client-1.2-uber.jar org.fusesource.mqtt.cli.Publisher -h tcp://localhost:61613 -u admin -p password -t test -m hello
+
+Your listener's command line process should then print to the screen the `hello` message.
+
+[client_release_jar]: http://repo.fusesource.com/nexus/content/repositories/public/org/fusesource/mqtt-client/mqtt-client/1.2/mqtt-client-1.2-uber.jar
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/proto/data.proto?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/proto/data.proto (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/proto/data.proto Tue Sep 25 18:53:01 2012
@@ -0,0 +1,31 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.apollo.mqtt;
+
+option java_multiple_files = true;
+
+message TopicPB {
+ optional bytes name = 1 [java_override_type = "UTF8Buffer"];
+ optional int32 qos = 2;
+ optional bytes address = 3 [java_override_type = "UTF8Buffer"];
+}
+
+message SessionPB {
+ optional bytes client_id = 1 [java_override_type = "UTF8Buffer"];
+ repeated TopicPB subscriptions = 2;
+ repeated int32 received_message_ids = 3;
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index Tue Sep 25 18:53:01 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+org.apache.activemq.apollo.mqtt.dto.Module
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index Tue Sep 25 18:53:01 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+org.apache.activemq.apollo.mqtt.MqttProtocolCodecFactory
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Tue Sep 25 18:53:01 2012
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+org.apache.activemq.apollo.mqtt.MqttProtocol
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/dto/jaxb.index?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/dto/jaxb.index (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/resources/org/apache/activemq/apollo/mqtt/dto/jaxb.index Tue Sep 25 18:53:01 2012
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+MqttDTO
+MqttConnectionStatusDTO
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocol.scala Tue Sep 25 18:53:01 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt
+
+import org.apache.activemq.apollo.broker._
+import java.lang.String
+import protocol.{ProtocolCodecFactory, ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.store._
+import org.fusesource.mqtt.codec.MQTTProtocolCodec
+import org.fusesource.hawtbuf._
+import org.fusesource.hawtbuf.Buffer._
+import org.fusesource.hawtdispatch.transport.ProtocolCodec
+
+/**
+ * Creates MqttCodec objects that encode/decode the
+ * <a href="http://activemq.apache.org/mqtt/">Mqtt</a> protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MqttProtocolCodecFactory extends ProtocolCodecFactory.Provider {
+
+ def id = "mqtt"
+ def createProtocolCodec(connector: Connector) = {
+ val rc = new MQTTProtocolCodec()
+ rc.setBufferPools(Broker.buffer_pools)
+ rc
+ }
+ def isIdentifiable() = true
+
+ //
+ // An MQTT CONNECT message has between 10-13 bytes:
+ // Message Type : 0x10 @ [0]
+ // Remaining Length : Byte{1-4} @ [1]
+ // Protocol Name : 0x00 0x06 'M' 'Q' 'I' 's' 'd' 'p' @ [2|3|4|5]
+ //
+ val HEAD_MAGIC = new Buffer(Array[Byte](0x10 ))
+ val TAIL_MAGIC = new Buffer(Array[Byte](0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p'))
+
+ def maxIdentificaionLength() = 13;
+ def matchesIdentification(header: Buffer):Boolean = {
+ if (header.length < 10) {
+ false
+ } else {
+ header.startsWith(HEAD_MAGIC) && header.indexOf(TAIL_MAGIC, 2) < 6
+ }
+ }
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object MqttProtocol extends MqttProtocolCodecFactory with Protocol {
+
+ val destination_parser = new DestinationParser
+ destination_parser.queue_prefix = null
+ destination_parser.topic_prefix = null
+ destination_parser.path_separator = "/"
+ destination_parser.any_child_wildcard = "+"
+ destination_parser.any_descendant_wildcard = "#"
+ destination_parser.dsub_prefix = null
+ destination_parser.temp_queue_prefix = null
+ destination_parser.temp_topic_prefix = null
+ destination_parser.destination_separator = null
+ destination_parser.regex_wildcard_end = null
+ destination_parser.regex_wildcard_end = null
+ destination_parser.part_pattern = null
+
+ val PROTOCOL_ID = ascii(id)
+ def createProtocolHandler = new MqttProtocolHandler
+
+}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Tue Sep 25 18:53:01 2012
@@ -0,0 +1,1292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt
+
+import org.fusesource.hawtbuf._
+import dto.{MqttConnectionStatusDTO, MqttDTO}
+import org.fusesource.hawtdispatch._
+
+import org.apache.activemq.apollo.broker._
+import java.lang.String
+import protocol._
+import protocol.RawMessage
+import security.SecurityContext
+import org.apache.activemq.apollo.util._
+import java.util.concurrent.TimeUnit
+import java.util.Map.Entry
+import java.io.IOException
+import org.apache.activemq.apollo.dto._
+import java.util.regex.Pattern
+import org.fusesource.mqtt.client.QoS._
+import org.fusesource.mqtt.client.Topic
+import org.fusesource.mqtt.codec.CONNACK.Code._
+import org.fusesource.mqtt.client.QoS
+import org.fusesource.hawtdispatch.transport.HeartBeatMonitor
+import org.apache.activemq.apollo.util.path.{Path, PathParser, PathMap}
+import org.fusesource.mqtt.codec._
+import scala.collection.mutable.{HashSet, HashMap}
+import org.apache.activemq.apollo.mqtt.MqttSessionManager._
+import org.apache.activemq.apollo.broker.store.{Store, StoreUOW}
+import scala.Array._
+import scala.Some
+import org.apache.activemq.apollo.mqtt.MqttSessionManager.HostState
+import org.apache.activemq.apollo.broker.SubscriptionAddress
+
+object MqttProtocolHandler extends Log {
+
+ case class Request(id:Short, message:MessageSupport.Message, ack:(DeliveryResult)=>Unit) {
+ val frame = if(message==null) null else message.encode()
+ var delivered = false
+ }
+
+ def received[T](value:T):T = {
+ trace("received: %s", value)
+ value
+ }
+
+ val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+
+ object SessionDeliverySizer extends Sizer[(Session[Delivery], Delivery)] {
+ def size(value: (Session[Delivery], Delivery)) = Delivery.size(value._2)
+ }
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MqttProtocolHandler extends ProtocolHandler {
+ import MqttProtocolHandler._
+
+ def protocol = "mqtt"
+
+ def broker = connection.connector.broker
+ def queue = connection.dispatch_queue
+
+ var connection_log:Log = MqttProtocolHandler
+ var config:MqttDTO = _
+
+ def destination_parser = {
+ var destination_parser = MqttProtocol.destination_parser
+ if( config.queue_prefix!=null ||
+ config.path_separator!= null ||
+ config.any_child_wildcard != null ||
+ config.any_descendant_wildcard!= null ||
+ config.regex_wildcard_start!= null ||
+ config.regex_wildcard_end!= null ||
+ config.part_pattern!= null
+ ) {
+ destination_parser = new DestinationParser().copy(destination_parser)
+ if( config.queue_prefix!=null ) { destination_parser.queue_prefix = config.queue_prefix }
+ if( config.path_separator!=null ) { destination_parser.path_separator = config.path_separator }
+ if( config.any_child_wildcard!=null ) { destination_parser.any_child_wildcard = config.any_child_wildcard }
+ if( config.any_descendant_wildcard!=null ) { destination_parser.any_descendant_wildcard = config.any_descendant_wildcard }
+ if( config.regex_wildcard_start!=null ) { destination_parser.regex_wildcard_start = config.regex_wildcard_start }
+ if( config.regex_wildcard_end!=null ) { destination_parser.regex_wildcard_end = config.regex_wildcard_end }
+ if( config.part_pattern!=null ) { destination_parser.part_pattern = Pattern.compile(config.part_pattern) }
+ }
+ destination_parser
+ }
+
+ var protocol_filters = List[ProtocolFilter2]()
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits related setting up a client connection
+ //
+ /////////////////////////////////////////////////////////////////////
+ def session_id = security_context.session_id
+
+ val security_context = new SecurityContext
+ var sink_manager:SinkMux[Request] = null
+ var connection_sink:Sink[Request] = null
+ var codec:MQTTProtocolCodec = _
+
+ override def on_transport_connected() = {
+ import collection.JavaConversions._
+
+ codec = connection.transport.getProtocolCodec.asInstanceOf[MQTTProtocolCodec]
+ val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
+ config = connector_config.protocols.find( _.isInstanceOf[MqttDTO]).map(_.asInstanceOf[MqttDTO]).getOrElse(new MqttDTO)
+ import OptionSupport._
+ config.max_message_length.foreach( codec.setMaxMessageLength(_) )
+
+ import collection.JavaConversions._
+ protocol_filters = ProtocolFilter2.create_filters(config.protocol_filters.toList, this)
+
+ security_context.local_address = connection.transport.getLocalAddress
+ security_context.remote_address = connection.transport.getRemoteAddress
+ security_context.connector_id = connection.connector.id
+ security_context.certificates = connection.certificates
+
+ connection_log = connection.connector.broker.connection_log
+ var filtering_sink:Sink[Request] = connection.transport_sink.map { request =>
+ trace("sent: %s", request.message)
+ request.delivered = true
+ if (request.id == 0 && request.ack != null) {
+ request.ack(Consumed)
+ }
+ request.frame
+ }
+ if(!protocol_filters.isEmpty) {
+ filtering_sink = filtering_sink.flatMap {x=>
+ var cur = Option(x)
+ protocol_filters.foreach { filter =>
+ cur = cur.flatMap(filter.filter_outbound(_))
+ }
+ cur
+ }
+ }
+ sink_manager = new SinkMux[Request](filtering_sink)
+ connection_sink = new OverflowSink(sink_manager.open());
+ resume_read
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits related tearing down a client connection
+ //
+ /////////////////////////////////////////////////////////////////////
+ var closed = false
+ def dead_handler(command:AnyRef):Unit = {}
+
+ override def on_transport_disconnected() = {
+ if( !closed ) {
+ closed=true;
+ dead = true;
+ command_handler = dead_handler _
+
+ security_context.logout( e => {
+ if(e!=null) {
+ connection_log.info(e, "MQTT connection '%s' log out error: %s", security_context.remote_address, e.toString)
+ }
+ })
+
+ heart_beat_monitor.stop
+ if( !connection.stopped ) {
+ connection.stop(NOOP)
+ }
+ trace("mqtt protocol resources released")
+ }
+ }
+
+ override def on_transport_failure(error: IOException) = {
+ if( !dead ) {
+ command_handler("failure")
+ dead = true
+ command_handler = dead_handler _
+ if( !connection.stopped ) {
+ connection_log.info(error, "Shutting connection '%s' down due to: %s", security_context.remote_address, error)
+ super.on_transport_failure(error);
+ }
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits related managing connection flow control
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ var status = WAITING_ON_CLIENT_REQUEST
+ def suspend_read(reason: => String) = {
+ status = reason _
+ connection.transport.suspendRead
+ heart_beat_monitor.suspendRead
+ }
+
+ def resume_read() = {
+ status = WAITING_ON_CLIENT_REQUEST
+ connection.transport.resumeRead
+ heart_beat_monitor.resumeRead
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits related to raising connection failure signals
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ var dead = false
+ def die_delay = {
+ import OptionSupport._
+ config.die_delay.getOrElse(1000*5L)
+ }
+
+ class Break extends RuntimeException
+
+ def async_die(msg:String, e:Throwable=null):Unit = try {
+ die(msg, e)
+ } catch {
+ case x:Break=>
+ }
+
+ def async_die(response:MessageSupport.Message, msg:String):Unit = try {
+ die(response, msg, null)
+ } catch {
+ case x:Break=>
+ }
+
+ def die[T](msg:String):T = die(null, msg, null)
+ def die[T](msg:String, e:Throwable):T = die(null, msg, e)
+ def die[T](response:MessageSupport.Message, msg:String):T = die(response, msg, null)
+ def die[T](response:MessageSupport.Message, msg:String, e:Throwable):T = {
+ if( e!=null) {
+ connection_log.info(e, "MQTT connection '%s' error: %s", security_context.remote_address, msg, e)
+ } else {
+ connection_log.info("MQTT connection '%s' error: %s", security_context.remote_address, msg)
+ }
+ die(response)
+ }
+
+ def die[T](response:MessageSupport.Message):T = {
+ if( !dead ) {
+ command_handler("failure")
+ dead = true
+ command_handler = dead_handler _
+ status = ()=>"shuting down"
+ if( response!=null ) {
+ connection.transport.resumeRead
+ connection_sink.offer(Request(0, response, null))
+ // TODO: if there are too many open connections we should just close the connection
+ // without waiting for the error to get sent to the client.
+ queue.after(die_delay, TimeUnit.MILLISECONDS) {
+ connection.stop(NOOP)
+ }
+ } else {
+ connection.stop(NOOP)
+ }
+ }
+ throw new Break()
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits for dispatching client requests.
+ //
+ /////////////////////////////////////////////////////////////////////
+ var command_handler: (AnyRef)=>Unit = connect_handler _
+
+ override def on_transport_command(command:AnyRef):Unit = {
+ try {
+
+ var f = command
+ val frame = if(!protocol_filters.isEmpty) {
+ var cur = Option(f)
+ protocol_filters.foreach { filter =>
+ cur = cur.flatMap(filter.filter_inbound(_))
+ }
+ cur match {
+ case Some(f) => f
+ case None => return // dropping the frame.
+ }
+ } else {
+ f
+ }
+
+ command_handler(frame)
+ } catch {
+ case e: Break =>
+ case e:Exception =>
+ // To avoid double logging to the same log category..
+ var msg: String = "Internal Server Error: " + e
+ if( connection_log!=MqttProtocolHandler ) {
+ // but we also want the error on the apollo.log file.
+ warn(e, msg)
+ }
+ async_die(msg, e);
+ }
+ }
+
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits related establishing the client connection
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ var connect_message:CONNECT = _
+ var heart_beat_monitor = new HeartBeatMonitor
+ var host:VirtualHost = _
+
+ def connect_handler(command:AnyRef):Unit = command match {
+ case s:MQTTProtocolCodec =>
+ // this is passed on to us by the protocol discriminator
+ // so we know which wire format is being used.
+ case command:MQTTFrame=>
+
+ command.messageType() match {
+ case CONNECT.TYPE =>
+ connect_message = received(new CONNECT().decode(command))
+ on_mqtt_connect
+ case _ =>
+ die("Expecting an MQTT CONNECT message, but got: "+command.getClass);
+ }
+ case "failure" =>
+ case _=>
+ die("Internal Server Error: unexpected mqtt command: "+command.getClass);
+ }
+
+ def on_mqtt_connect:Unit = {
+
+ val connack = new CONNACK
+
+ if(connect_message.version!=3) {
+ connack.code(CONNECTION_REFUSED_UNACCEPTED_PROTOCOL_VERSION)
+ die(connack, "Unsupported protocol version: "+connect_message.version)
+ }
+
+ val client_id = connect_message.clientId()
+ security_context.user = Option(connect_message.userName).map(_.toString).getOrElse(null)
+ security_context.password = Option(connect_message.password).map(_.toString).getOrElse(null)
+ security_context.session_id = Some(client_id.toString)
+
+ val keep_alive = connect_message.keepAlive
+ if( keep_alive > 0 ) {
+ heart_beat_monitor.setReadInterval((keep_alive*1.5).toLong*1000)
+ heart_beat_monitor.setOnDead(^{
+ async_die("Missed keep alive set to "+keep_alive+" seconds")
+ });
+ }
+ heart_beat_monitor.suspendRead()
+ heart_beat_monitor.setTransport(connection.transport)
+ heart_beat_monitor.start
+
+ suspend_read("virtual host lookup")
+ broker.dispatch_queue {
+ host = connection.connector.broker.get_default_virtual_host
+ queue {
+ resume_read
+ if(host==null) {
+ connack.code(CONNECTION_REFUSED_SERVER_UNAVAILABLE)
+ async_die(connack, "Default virtual host not found.")
+ } else if(!host.service_state.is_started) {
+ connack.code(CONNECTION_REFUSED_SERVER_UNAVAILABLE)
+ async_die(connack, "Default virtual host stopped.")
+ } else {
+ connection_log = host.connection_log
+ if( host.authenticator!=null && host.authorizer!=null ) {
+ suspend_read("authenticating and authorizing connect")
+ host.authenticator.authenticate(security_context) { auth_err =>
+ queue {
+ if( auth_err!=null ) {
+ connack.code(CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD)
+ async_die(connack, auth_err+". Credentials="+security_context.credential_dump)
+ } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
+ connack.code(CONNECTION_REFUSED_NOT_AUTHORIZED)
+ async_die(connack, "Not authorized to connect to connector '%s'. Principals=".format(connection.connector.id, security_context.principal_dump))
+ } else if( !host.authorizer.can(security_context, "connect", host) ) {
+ connack.code(CONNECTION_REFUSED_NOT_AUTHORIZED)
+ async_die(connack, "Not authorized to connect to virtual host '%s'. Principals=".format(host.id, security_context.principal_dump))
+ } else {
+ resume_read
+ on_host_connected(host)
+ }
+ }
+ }
+ } else {
+ on_host_connected(host)
+ }
+ }
+ }
+ }
+ }
+
+ def on_host_connected(host:VirtualHost):Unit = {
+ MqttSessionManager.attach(host, connect_message.clientId(), this)
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Other msic bits.
+ //
+ /////////////////////////////////////////////////////////////////////
+ var messages_sent = 0L
+ var messages_received = 0L
+ var subscription_count = 0
+
+ override def create_connection_status = {
+ var rc = new MqttConnectionStatusDTO
+ rc.protocol_version = "3.1"
+ rc.messages_sent = messages_sent
+ rc.messages_received = messages_received
+ rc.subscription_count = subscription_count
+ rc.waiting_on = status()
+ rc
+ }
+
+}
+
+
+/**
+ * Tracks active sessions so that we can ensure that a given
+ * session id is only associated with once connection
+ * at a time. If a client tries to establish a 2nd
+ * connection, the first one will be closed before the session
+ * is switch to the new connection.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object MqttSessionManager {
+
+ val queue = createQueue("session manager")
+
+ class SessionState {
+ var durable_sub:SubscriptionAddress = _
+ val subscriptions = HashMap[UTF8Buffer, (Topic, BindAddress)]()
+ val received_message_ids: HashSet[Short] = new HashSet[Short]
+
+ trait StorageStrategy {
+ def update(cb: =>Unit)
+ def destroy(cb: =>Unit)
+ def create(store:Store, client_id:UTF8Buffer)
+ }
+ case class NoopStrategy() extends StorageStrategy {
+ def update(cb: =>Unit) = { cb }
+ def destroy(cb: =>Unit) { cb }
+ def create(store:Store, client_id:UTF8Buffer) = {
+ if(store!=null)
+ strategy = StoreStrategy(store, client_id)
+ }
+ }
+
+ case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends StorageStrategy {
+ val session_key = new UTF8Buffer("mqtt:"+client_id)
+ def update(cb: =>Unit) = {
+ val uow = store.create_uow()
+ val session_pb = new SessionPB.Bean
+ session_pb.setClientId(client_id)
+ received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
+ subscriptions.values.foreach { case (topic, address) =>
+ val topic_pb = new TopicPB.Bean
+ topic_pb.setName(topic.name())
+ topic_pb.setQos(topic.qos().ordinal())
+ topic_pb.setAddress(new UTF8Buffer(address.toString))
+ session_pb.addSubscriptions(topic_pb)
+ }
+ uow.put(session_key, session_pb.freeze().toUnframedBuffer)
+
+ val current = getCurrentQueue
+ uow.on_complete {
+ current {
+ cb
+ }
+ }
+ uow.release()
+ }
+
+ def destroy(cb: =>Unit) {
+ val uow = store.create_uow()
+ uow.put(session_key, null)
+ val current = getCurrentQueue
+ uow.on_complete {
+ current {
+ strategy = NoopStrategy()
+ cb
+ }
+ }
+ uow.release()
+ }
+ def create(store:Store, client_id:UTF8Buffer) = {
+ }
+ }
+ var strategy:StorageStrategy = new NoopStrategy
+
+ }
+
+
+ case class HostState(host:VirtualHost) {
+ val session_states = HashMap[UTF8Buffer, SessionState]()
+ val sessions = HashMap[UTF8Buffer, MqttSession]()
+
+ var loaded = false;
+ def on_load(func: =>Unit) = {
+ if( loaded ) {
+ func
+ } else {
+ if(host.store!=null) {
+ // We load all the persisted session's from the host's store when we are first accessed.
+ queue.suspend()
+ host.store.get_prefixed_map_entries(new AsciiBuffer("mqtt:")) { entries =>
+ queue.resume()
+ queue {
+ for( (_, value) <- entries ) {
+ import collection.JavaConversions._
+ val session_pb = SessionPB.FACTORY.parseUnframed(value)
+ val session_state = new SessionState()
+ session_state.strategy.create(host.store, session_pb.getClientId)
+ if( session_pb.hasReceivedMessageIds ) {
+ session_state.received_message_ids ++= session_pb.getReceivedMessageIdsList.map(_.toShort)
+ }
+ if( session_pb.hasSubscriptions ) {
+ session_pb.getSubscriptionsList.foreach { sub =>
+ val address = SimpleAddress(sub.getAddress.toString)
+ val topic = new Topic(sub.getName, QoS.values()(sub.getQos))
+ session_state.subscriptions += sub.getName -> (topic,address)
+ }
+ }
+ session_states.put(session_pb.getClientId, session_state)
+ }
+ loaded = true
+ func
+ }
+ }
+ } else {
+ loaded = true
+ func
+ }
+ }
+ }
+ }
+
+ def attach(host:VirtualHost, client_id:UTF8Buffer, handler:MqttProtocolHandler) = queue {
+ val host_state = host.plugin_state(new HostState(host), classOf[HostState])
+ host_state.on_load {
+ host_state.sessions.get(client_id) match {
+ case Some(assignment) =>
+ assignment.connect(handler)
+ case None =>
+ val state = if( handler.connect_message.cleanSession() ) {
+ host_state.session_states.remove(client_id).getOrElse(new SessionState())
+ } else {
+ host_state.session_states.getOrElseUpdate(client_id, new SessionState())
+ }
+ val assignment = MqttSession(host_state, client_id, state)
+ assignment.connect(handler)
+ host_state.sessions.put(client_id, assignment)
+ }
+ }
+ }
+
+ def disconnect(host_state:HostState, client_id:UTF8Buffer, handler:MqttProtocolHandler) = queue {
+ host_state.sessions.get(client_id) match {
+ case Some(assignment) => assignment.disconnect(handler)
+ case None => // Don't expect this to hit.
+ }
+ }
+
+ def remove(host_state:HostState, client_id:UTF8Buffer) = queue {
+ host_state.sessions.remove(client_id)
+ }
+}
+
+/**
+ * An MqttSession can be switch from one connection/protocol handler to another,
+ * but it will only be associated with one at a time. An MqttSession tracks
+ * the state of the communication with a client.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+case class MqttSession(host_state:HostState, client_id:UTF8Buffer, session_state:SessionState) {
+
+ import MqttProtocolHandler._
+
+ def host = host_state.host
+
+ val queue = createQueue("mqtt: "+client_id)
+ var manager_disconnected = false
+
+ var handler:Option[MqttProtocolHandler] = None
+ var security_context:SecurityContext = _
+ var clean_session = false
+ var connect_message:CONNECT = _
+ var destination_parser = MqttProtocol.destination_parser
+
+ def connect(next:MqttProtocolHandler):Unit = queue {
+ if(manager_disconnected) {
+ // we are not the assignment anymore.. go to the session manager
+ // again to setup a new session.
+ MqttSessionManager.attach(host, client_id, next)
+ } else {
+
+ // so that we don't switch again until this current switch completes
+ queue.suspend()
+ if( handler != None ) {
+ detach
+ handler = None
+ }
+ queue {
+ handler=Some(next)
+ attach
+ }
+
+ // switch the connection to the session queue..
+ next.connection.set_dispatch_queue(queue) {
+ queue.resume()
+ }
+ }
+ }
+
+ def disconnect(prev:MqttProtocolHandler) = queue {
+ if( handler==Some(prev) ) {
+ MqttSessionManager.remove(host_state, client_id)
+ manager_disconnected = true
+ detach
+ handler = None
+ }
+ }
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits that deal with connections attaching/detaching from the session
+ //
+ /////////////////////////////////////////////////////////////////////
+ def attach = {
+ queue.assertExecuting()
+ val h = handler.get
+ clean_session = h.connect_message.cleanSession()
+ security_context = h.security_context
+ h.command_handler = on_transport_command _
+ destination_parser = h.destination_parser
+ mqtt_consumer.consumer_sink.downstream = Some(h.sink_manager.open)
+
+ def ack_connect = {
+ queue.assertExecuting()
+ connect_message = h.connect_message
+ val connack = new CONNACK
+ connack.code(CONNECTION_ACCEPTED)
+ send(connack)
+ }
+
+ if( !clean_session ) {
+ // Setup the previous subscriptions..
+ session_state.strategy.create(host.store, client_id)
+ if( !session_state.subscriptions.isEmpty ) {
+ h.suspend_read("subscribing")
+ subscribe(session_state.subscriptions.map(_._2._1)) {
+ h.resume_read()
+ h.queue {
+ ack_connect
+ }
+ }
+ } else {
+ ack_connect
+ }
+ } else {
+ // do we need to clear the received ids?
+ // durable_session_state.received_message_ids.clear()
+ session_state.subscriptions.clear()
+ if( session_state.durable_sub !=null ) {
+ var addresses = Array(session_state.durable_sub)
+ session_state.durable_sub = null
+ host.dispatch_queue {
+ host.router.delete(addresses, security_context)
+ }
+ }
+ session_state.strategy.destroy {
+ ack_connect
+ }
+ }
+
+ }
+
+ def detach:Unit = {
+ queue.assertExecuting()
+
+ if(!producerRoutes.isEmpty) {
+ import collection.JavaConversions._
+ val routes = producerRoutes.values.toSeq.toArray
+ host.dispatch_queue {
+ routes.foreach { route=>
+ host.router.disconnect(Array(route.address), route)
+ }
+ }
+ producerRoutes.clear
+ }
+
+ if( clean_session ) {
+ if(!mqtt_consumer.addresses.isEmpty) {
+ var addresses = mqtt_consumer.addresses.keySet.toArray
+ host.dispatch_queue {
+ host.router.unbind(addresses, mqtt_consumer, false , security_context)
+ }
+ mqtt_consumer.addresses.clear()
+ }
+ session_state.subscriptions.clear()
+ } else {
+ if(session_state.durable_sub!=null) {
+ var addresses = Array(session_state.durable_sub)
+ host.dispatch_queue {
+ host.router.unbind(addresses, mqtt_consumer, false , security_context)
+ }
+ mqtt_consumer.addresses.clear()
+ session_state.durable_sub = null
+ }
+ }
+
+ in_flight_publishes.values.foreach { request =>
+ if( request.ack!=null ) {
+ if(request.delivered) {
+ request.ack(Delivered)
+ } else {
+ request.ack(Undelivered)
+ }
+ }
+ }
+ in_flight_publishes.clear()
+
+ handler.get.sink_manager.close(mqtt_consumer.consumer_sink.downstream.get, (request)=>{})
+ mqtt_consumer.consumer_sink.downstream = None
+
+ handler.get.on_transport_disconnected()
+ }
+
+ def decode_destination(value:UTF8Buffer):SimpleAddress = {
+ val rc = destination_parser.decode_single_destination(value.toString, (name)=>{
+ SimpleAddress("topic", destination_parser.decode_path(name))
+ })
+ if( rc==null ) {
+ handler.foreach(_.die("Invalid mqtt destination name: "+value))
+ }
+ rc
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits that deal with assigning message ids to QoS > 0 requests
+ // and tracking those requests so that they can get replayed on a
+ // reconnect.
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ var in_flight_publishes = HashMap[Short, Request]()
+
+ def send(message: MessageSupport.Message): Unit = {
+ queue.assertExecuting()
+ handler.foreach(_.connection_sink.offer(Request(0, message, null)))
+ }
+
+ def publish_completed(id: Short): Unit = {
+ queue.assertExecuting()
+ in_flight_publishes.remove(id) match {
+ case Some(request) =>
+ if ( request.ack != null ) {
+ request.ack(Consumed)
+ }
+ case None =>
+ // It's possible that on a reconnect, we get an ACK
+ // in for message that was not dispatched yet. store
+ // a place holder so we ack it upon the dispatch
+ // attempt.
+ in_flight_publishes.put(id, Request(id, null, null))
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits that deal with processing new messages from the client.
+ //
+ /////////////////////////////////////////////////////////////////////
+ def on_transport_command(command:AnyRef):Unit = command match {
+ case command:MQTTFrame=>
+
+ command.messageType() match {
+
+ case PUBLISH.TYPE =>
+ on_mqtt_publish(received(new PUBLISH().decode(command)))
+
+ // This follows a Publish with QoS EXACTLY_ONCE
+ case PUBREL.TYPE =>
+ var ack = received(new PUBREL().decode(command))
+ // TODO: perhaps persist the processed list.. otherwise
+ // we can't filter out dups after a broker restart.
+ session_state.received_message_ids.remove(ack.messageId)
+ session_state.strategy.update {
+ send(new PUBCOMP().messageId(ack.messageId))
+ }
+
+ case SUBSCRIBE.TYPE =>
+ on_mqtt_subscribe(received(new SUBSCRIBE().decode(command)))
+
+ case UNSUBSCRIBE.TYPE =>
+ on_mqtt_unsubscribe(received(new UNSUBSCRIBE().decode(command)))
+
+ // AT_LEAST_ONCE ack flow for a client subscription
+ case PUBACK.TYPE =>
+ val ack = received(new PUBACK().decode(command))
+ publish_completed(ack.messageId)
+
+ // EXACTLY_ONCE ack flow for a client subscription
+ case PUBREC.TYPE =>
+ val ack = received(new PUBREC().decode(command))
+ send(new PUBREL().messageId(ack.messageId))
+
+ case PUBCOMP.TYPE =>
+ val ack: PUBCOMP = received(new PUBCOMP().decode(command))
+ publish_completed(ack.messageId)
+
+ case PINGREQ.TYPE =>
+ received(new PINGREQ().decode(command))
+ send(new PINGRESP())
+
+ case DISCONNECT.TYPE =>
+ received(new DISCONNECT())
+ MqttSessionManager.disconnect(host_state, client_id, handler.get)
+
+ case _ =>
+ handler.get.die("Invalid MQTT message type: "+command.messageType());
+ }
+ case "failure" =>
+ // Publish the client's will
+ publish_will {
+ // then disconnect him.
+ MqttSessionManager.disconnect(host_state, client_id, handler.get)
+ }
+
+ case _=>
+ handler.get.die("Internal Server Error: unexpected mqtt command: "+command.getClass);
+ }
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits that deal with processing PUBLISH messages
+ //
+ /////////////////////////////////////////////////////////////////////
+ var producerRoutes = new LRUCache[UTF8Buffer, MqttProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[UTF8Buffer, MqttProducerRoute]) = {
+ host.router.disconnect(Array(eldest.getValue.address), eldest.getValue)
+ }
+ }
+ case class MqttProducerRoute(address:SimpleAddress, handler:MqttProtocolHandler) extends DeliveryProducerRoute(host.router) {
+ override def send_buffer_size = handler.codec.getReadBufferSize
+ override def connection = Some(handler.connection)
+ override def dispatch_queue = queue
+ refiller = ^{
+ handler.resume_read
+ }
+ }
+
+ def on_mqtt_publish(publish:PUBLISH):Unit = {
+
+ if( (publish.qos eq EXACTLY_ONCE) && session_state.received_message_ids.contains(publish.messageId)) {
+ val response = new PUBREC
+ response.messageId(publish.messageId)
+ send(response)
+ return
+ }
+
+ handler.get.messages_received += 1
+
+ queue.assertExecuting()
+ producerRoutes.get(publish.topicName()) match {
+ case null =>
+ // create the producer route...
+
+ val destination = decode_destination(publish.topicName())
+ val route = MqttProducerRoute(destination, handler.get)
+
+ // don't process commands until producer is connected...
+ route.handler.suspend_read("route publish lookup")
+ host.dispatch_queue {
+ host.router.connect(Array(destination), route, security_context)
+ queue {
+ // We don't care if we are not allowed to send..
+ if (!route.handler.connection.stopped) {
+ route.handler.resume_read
+ producerRoutes.put(publish.topicName(), route)
+ send_via_route(route, publish)
+ }
+ }
+ }
+
+ case route =>
+ // we can re-use the existing producer route
+ send_via_route(route, publish)
+ }
+ }
+
+ def send_via_route(route:DeliveryProducerRoute, publish:PUBLISH):Unit = {
+ queue.assertExecuting()
+
+ def at_least_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
+ val response = new PUBACK
+ response.messageId(publish.messageId)
+ send(response)
+ }
+
+ def exactly_once_ack(r:DeliveryResult, uow:StoreUOW):Unit = queue {
+ queue.assertExecuting()
+ // TODO: perhaps persist the processed list..
+ session_state.received_message_ids.add(publish.messageId)
+ session_state.strategy.update {
+ val response = new PUBREC
+ response.messageId(publish.messageId)
+ send(response)
+ }
+ }
+
+ val ack = publish.qos match {
+ case AT_LEAST_ONCE => at_least_once_ack _
+ case EXACTLY_ONCE => exactly_once_ack _
+ case AT_MOST_ONCE => null
+ }
+
+ if( !route.targets.isEmpty ) {
+ val delivery = new Delivery
+ delivery.message = RawMessage(publish.payload)
+ delivery.persistent = publish.qos().ordinal() > 0
+ delivery.size = publish.payload.length
+ delivery.ack = ack
+ if( publish.retain() ) {
+ if( delivery.size == 0 ) {
+ delivery.retain = RetainRemove
+ } else {
+ delivery.retain = RetainSet
+ }
+ }
+
+ // routes can always accept at least 1 delivery...
+ assert( !route.full )
+ route.offer(delivery)
+ if( route.full ) {
+ // but once it gets full.. suspend to flow control the producer.
+ handler.get.suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
+ }
+
+ } else {
+ ack(null, null)
+ }
+ }
+
+
+ //
+ def publish_will(complete_close: =>Unit) = {
+ if(connect_message!=null) {
+ if( connect_message.willTopic()==null ) {
+ complete_close
+ } else {
+
+ val destination = decode_destination(connect_message.willTopic())
+ val prodcuer = new DeliveryProducerRoute(host.router) {
+ override def send_buffer_size = 1024*64
+ override def connection = handler.map(_.connection)
+ override def dispatch_queue = queue
+ refiller = NOOP
+ }
+
+ host.dispatch_queue {
+ host.router.connect(Array(destination), prodcuer, security_context)
+ queue {
+ if(prodcuer.targets.isEmpty) {
+ complete_close
+ } else {
+ val delivery = new Delivery
+ delivery.message = RawMessage(connect_message.willMessage())
+ delivery.size = connect_message.willMessage().length
+ delivery.persistent = connect_message.willQos().ordinal() > 0
+ if( connect_message.willRetain() ) {
+ if( delivery.size == 0 ) {
+ delivery.retain = RetainRemove
+ } else {
+ delivery.retain = RetainSet
+ }
+ }
+
+ delivery.ack = (x,y) => {
+ host.dispatch_queue {
+ host.router.disconnect(Array(destination), prodcuer)
+ }
+ complete_close
+ }
+ handler.get.messages_received += 1
+ prodcuer.offer(delivery)
+ }
+ }
+ }
+ }
+ }
+ }
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Bits that deal with subscriptions
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ def on_mqtt_subscribe(sub:SUBSCRIBE):Unit = {
+ subscribe(sub.topics()) {
+ queue {
+ session_state.strategy.update {
+ val suback = new SUBACK
+ suback.messageId(sub.messageId())
+ suback.grantedQos(sub.topics().map(_.qos().ordinal().toByte))
+ send(suback)
+ }
+ }
+ }
+ }
+
+ def subscribe(topics:Traversable[Topic])(on_subscribed: => Unit):Unit = {
+ var addresses:Array[_ <: BindAddress] = topics.toArray.map { topic =>
+ var address:BindAddress = decode_destination(topic.name)
+ session_state.subscriptions += topic.name -> (topic, address)
+ mqtt_consumer.addresses += address -> topic.qos
+ if(PathParser.containsWildCards(address.path)) {
+ mqtt_consumer.wildcards.put( address.path, topic.qos() )
+ }
+ address
+ }
+
+ handler.get.subscription_count = mqtt_consumer.addresses.size
+
+ addresses = if( clean_session ) {
+ addresses
+ } else {
+ session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), null, mqtt_consumer.addresses.keySet.toArray)
+ Array(session_state.durable_sub)
+ }
+
+ host.dispatch_queue {
+ addresses.foreach { address=>
+ host.router.bind(Array[BindAddress](address), mqtt_consumer, security_context)
+ // MQTT ignores subscribe failures.
+ }
+ on_subscribed
+ }
+
+ }
+
+ def on_mqtt_unsubscribe(unsubscribe:UNSUBSCRIBE):Unit = {
+
+ val addresses:Array[_ <: BindAddress] = unsubscribe.topics.flatMap { topic =>
+ session_state.subscriptions.remove(topic).map { case (topic, address)=>
+ mqtt_consumer.addresses.remove(address)
+ if(PathParser.containsWildCards(address.path)) {
+ mqtt_consumer.wildcards.remove(address.path, topic.qos)
+ }
+ address
+ }
+ }
+
+ handler.get.subscription_count = mqtt_consumer.addresses.size
+
+ if(!clean_session) {
+ session_state.durable_sub = SubscriptionAddress(Path(client_id.toString), null, mqtt_consumer.addresses.keySet.toArray)
+ }
+
+ host.dispatch_queue {
+ if(clean_session) {
+ host.router.unbind(addresses, mqtt_consumer, false, security_context)
+ } else {
+ if( mqtt_consumer.addresses.isEmpty ) {
+ host.router.unbind(Array(session_state.durable_sub), mqtt_consumer, true, security_context)
+ session_state.durable_sub = null
+ } else {
+ host.router.bind(Array(session_state.durable_sub), mqtt_consumer, security_context)
+ }
+ }
+ queue {
+ session_state.strategy.update {
+ val ack = new UNSUBACK
+ ack.messageId(unsubscribe.messageId())
+ send(ack)
+ }
+ }
+ }
+
+ }
+
+ var publish_body = false
+
+ lazy val mqtt_consumer = new MqttConsumer
+ class MqttConsumer extends BaseRetained with DeliveryConsumer {
+
+ override def toString = "mqtt client:"+client_id+" remote address: "+security_context.remote_address
+
+ val addresses = HashMap[BindAddress, QoS]()
+ val wildcards = new PathMap[QoS]()
+
+ val credit_window_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
+ def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
+ if( previous == null ) {
+ event
+ } else {
+ (previous._1+event._1, previous._2+event._2)
+ }
+ }
+ def mergeEvents(previous:(Int, Int), events:(Int, Int)) = mergeEvent(previous, events)
+ }, dispatch_queue)
+
+ credit_window_source.setEventHandler(^{
+ val data = credit_window_source.getData
+ credit_window_filter.credit(data._1, data._2)
+ });
+ credit_window_source.resume
+
+ val consumer_sink = new MutableSink[Request]()
+ consumer_sink.downstream = None
+
+ var next_seq_id = 1L
+ def get_next_seq_id = {
+ val rc = next_seq_id
+ next_seq_id += 1
+ rc
+ }
+
+ def to_message_id(value:Long):Short = (
+ 0x8000 | // MQTT message ids cannot be zero, so we always set the highest bit.
+ (value & 0x7FFF) // the lower 15 bits come for the original seq id.
+ ).toShort
+
+ val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.flatMap{ event =>
+ queue.assertExecuting()
+ val (session, delivery) = event
+
+ // Look up which QoS we need to send this message with..
+ var topic = delivery.sender.head.simple
+ import collection.JavaConversions._
+ addresses.get(topic).orElse(wildcards.get(topic.path).headOption) match {
+
+ case None =>
+ // draining messages after an un-subscribe
+ acked(delivery, Consumed)
+ None
+
+ case Some(qos) =>
+
+ // Convert the Delivery into a Request
+ var publish = new PUBLISH
+ publish.topicName(new UTF8Buffer(destination_parser.encode_destination(Array(delivery.sender.head))))
+ if( delivery.redeliveries > 0) {
+ publish.dup(true)
+ }
+
+ if( delivery.message.codec eq RawMessageCodec ) {
+ publish.payload(delivery.message.asInstanceOf[RawMessage].payload)
+ } else {
+ if( publish_body ) {
+ publish.payload(delivery.message.getBodyAs(classOf[Buffer]))
+ } else {
+ publish.payload(delivery.message.encoded)
+ }
+ }
+
+ handler.get.messages_sent += 1
+
+ if (delivery.ack!=null && (qos ne AT_MOST_ONCE)) {
+ publish.qos(qos)
+ val id = to_message_id(if(clean_session) {
+ get_next_seq_id // generate our own seq id.
+ } else {
+ delivery.seq // use the durable sub's seq id..
+ })
+
+ publish.messageId(id)
+ val request = Request(id, publish, (result)=>{acked(delivery, result)})
+ in_flight_publishes.put(id, request) match {
+ case Some(r) =>
+ // A reconnecting client could have acked before
+ // we get dispatched by the durable sub.
+ if( r.message == null ) {
+ in_flight_publishes.remove(id)
+ acked(delivery, Consumed)
+ } else {
+ // Looks we sent out a msg with that id. This could only
+ // happen once we send out 0x7FFF message and the first
+ // one has not been acked.
+ handler.foreach(_.async_die("Client not acking regularly.", null))
+ }
+ case None =>
+ }
+
+ Some(request)
+
+ } else {
+ // This callback gets executed once the message
+ // sent to the transport.
+ publish.qos(AT_MOST_ONCE)
+ Some(Request(0, publish, (result)=>{ acked(delivery, result) }))
+ }
+ }
+
+ }, SessionDeliverySizer)
+
+ def acked(delivery:Delivery, result:DeliveryResult) = {
+ queue.assertExecuting()
+ credit_window_source.merge((delivery.size, 1))
+ if( delivery.ack!=null ) {
+ delivery.ack(result, null)
+ }
+ }
+
+ credit_window_filter.credit(handler.get.codec.getWriteBufferSize*2, 1)
+
+ val session_manager = new SessionSinkMux[Delivery](credit_window_filter, queue, Delivery, Integer.MAX_VALUE/2, receive_buffer_size) {
+ override def time_stamp = host.broker.now
+ }
+
+ override def dispose() = queue {
+ super.dispose()
+ }
+
+ def dispatch_queue = queue
+ override def connection = handler.map(_.connection)
+ override def receive_buffer_size = 1024*64; // handler.codec.getWriteBufferSize
+ def is_persistent = false
+ def matches(delivery:Delivery):Boolean = true
+
+ //
+ // Each destination we subscribe to will establish a session with us.
+ //
+ class MqttConsumerSession(val producer:DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
+ producer.dispatch_queue.assertExecuting()
+ retain
+
+ val downstream = session_manager.open(producer.dispatch_queue)
+
+ override def toString = "connection to "+handler.map(_.connection.transport.getRemoteAddress).getOrElse("unconnected")
+
+ def consumer = mqtt_consumer
+ var closed = false
+
+ def close = {
+ assert(producer.dispatch_queue.isExecuting)
+ if( !closed ) {
+ closed = true
+ dispose
+ }
+ }
+
+ def dispose = {
+ session_manager.close(downstream, (delivery)=>{
+ // We have been closed so we have to nak any deliveries.
+ if( delivery.ack!=null ) {
+ delivery.ack(Undelivered, delivery.uow)
+ }
+ })
+ release
+ }
+
+ // Delegate all the flow control stuff to the session
+ override def full = {
+ val rc = super.full
+ rc
+ }
+
+ def offer(delivery:Delivery) = {
+ if( full ) {
+ false
+ } else {
+ delivery.message.retain()
+ val rc = downstream.offer(delivery)
+ assert(rc, "offer should be accepted since it was not full")
+ true
+ }
+ }
+
+ }
+ def connect(p:DeliveryProducer) = new MqttConsumerSession(p)
+ }
+
+}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/Module.scala Tue Sep 25 18:53:01 2012
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt.dto
+
+import org.apache.activemq.apollo.util.DtoModule
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Module extends DtoModule {
+ def dto_package = "org.apache.activemq.apollo.mqtt.dto"
+
+ def extension_classes = Array(classOf[MqttDTO], classOf[MqttConnectionStatusDTO])
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.java?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.java Tue Sep 25 18:53:01 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt.dto;
+
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="mqtt_connection_status")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class MqttConnectionStatusDTO extends ConnectionStatusDTO {
+
+}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttDTO.java?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/MqttDTO.java Tue Sep 25 18:53:01 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.mqtt.dto;
+
+import org.apache.activemq.apollo.dto.ProtocolDTO;
+import org.apache.activemq.apollo.dto.ProtocolFilterDTO;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Allow you to customize the mqtt protocol implementation.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="mqtt")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class MqttDTO extends ProtocolDTO {
+
+ @XmlAttribute(name="max_message_length")
+ public Integer max_message_length;
+
+ @XmlElementRef
+ public List<ProtocolFilterDTO> protocol_filters = new ArrayList<ProtocolFilterDTO>();
+
+ @XmlAttribute(name="queue_prefix")
+ public String queue_prefix;
+
+ @XmlAttribute(name="path_separator")
+ public String path_separator;
+
+ @XmlAttribute(name="any_child_wildcard")
+ public String any_child_wildcard;
+
+ @XmlAttribute(name="any_descendant_wildcard")
+ public String any_descendant_wildcard;
+
+ @XmlAttribute(name="regex_wildcard_start")
+ public String regex_wildcard_start;
+
+ @XmlAttribute(name="regex_wildcard_end")
+ public String regex_wildcard_end;
+
+ @XmlAttribute(name="part_pattern")
+ public String part_pattern;
+
+ @XmlAttribute(name="die_delay")
+ public Long die_delay;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ MqttDTO mqttDTO = (MqttDTO) o;
+
+ if (any_child_wildcard != null ? !any_child_wildcard.equals(mqttDTO.any_child_wildcard) : mqttDTO.any_child_wildcard != null)
+ return false;
+ if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(mqttDTO.any_descendant_wildcard) : mqttDTO.any_descendant_wildcard != null)
+ return false;
+ if (max_message_length != null ? !max_message_length.equals(mqttDTO.max_message_length) : mqttDTO.max_message_length != null)
+ return false;
+ if (path_separator != null ? !path_separator.equals(mqttDTO.path_separator) : mqttDTO.path_separator != null)
+ return false;
+ if (protocol_filters != null ? !protocol_filters.equals(mqttDTO.protocol_filters) : mqttDTO.protocol_filters != null)
+ return false;
+ if (queue_prefix != null ? !queue_prefix.equals(mqttDTO.queue_prefix) : mqttDTO.queue_prefix != null)
+ return false;
+ if (regex_wildcard_end != null ? !regex_wildcard_end.equals(mqttDTO.regex_wildcard_end) : mqttDTO.regex_wildcard_end != null)
+ return false;
+ if (regex_wildcard_start != null ? !regex_wildcard_start.equals(mqttDTO.regex_wildcard_start) : mqttDTO.regex_wildcard_start != null)
+ return false;
+ if (part_pattern != null ? !part_pattern.equals(mqttDTO.part_pattern) : mqttDTO.part_pattern != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (max_message_length != null ? max_message_length.hashCode() : 0);
+ result = 31 * result + (protocol_filters != null ? protocol_filters.hashCode() : 0);
+ result = 31 * result + (queue_prefix != null ? queue_prefix.hashCode() : 0);
+ result = 31 * result + (part_pattern != null ? part_pattern.hashCode() : 0);
+ result = 31 * result + (path_separator != null ? path_separator.hashCode() : 0);
+ result = 31 * result + (any_child_wildcard != null ? any_child_wildcard.hashCode() : 0);
+ result = 31 * result + (any_descendant_wildcard != null ? any_descendant_wildcard.hashCode() : 0);
+ result = 31 * result + (regex_wildcard_start != null ? regex_wildcard_start.hashCode() : 0);
+ result = 31 * result + (regex_wildcard_end != null ? regex_wildcard_end.hashCode() : 0);
+ return result;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/package-info.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/package-info.java?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/package-info.java (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/dto/package-info.java Tue Sep 25 18:53:01 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The JAXB POJOs for the
+ * The JAXB POJOs for the
+ * <a href="http://activemq.apache.org/schema/activemq/apollo/xml-configuration.html">XML Configuration</a>
+ * of the ActiveMQ Broker.
+ */
+@javax.xml.bind.annotation.XmlSchema(
+ namespace = "http://activemq.apache.org/schema/activemq/apollo",
+ elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED)
+package org.apache.activemq.apollo.mqtt.dto;
+
Added: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.jade?rev=1390048&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.jade (added)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/webapp/WEB-INF/org/apache/activemq/apollo/mqtt/dto/MqttConnectionStatusDTO.jade Tue Sep 25 18:53:01 2012
@@ -0,0 +1,53 @@
+-# Licensed to the Apache Software Foundation (ASF) under one or more
+-# contributor license agreements. See the NOTICE file distributed with
+-# this work for additional information regarding copyright ownership.
+-# The ASF licenses this file to You under the Apache License, Version 2.0
+-# (the "License"); you may not use this file except in compliance with
+-# the License. You may obtain a copy of the License at
+-#
+-# http://www.apache.org/licenses/LICENSE-2.0
+-#
+-# Unless required by applicable law or agreed to in writing, software
+-# distributed under the License is distributed on an "AS IS" BASIS,
+-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-# See the License for the specific language governing permissions and
+-# limitations under the License.
+-#
+
+
+- import it._
+- val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
+- import helper._
+
+.breadcumbs
+ a(href={strip_resolve("..")}) Back
+
+p state: #{state} #{ uptime(state_since) } ago
+
+- if( state == "STARTED" )
+ form(method="post" action={path("action/delete")})
+ input(type="submit" value="shutdown")
+
+h4 Connection Info
+
+p connector: #{connector}
+p local address: #{local_address}
+p remote address: #{remote_address}
+
+h4 Protocol Info
+
+p protocol: #{protocol}
+p protocol version: #{protocol_version}
+p protocol session id: #{protocol_session_id}
+p user: #{user}
+p subscription count: #{subscription_count}
+p waiting on: #{waiting_on}
+
+h4 Metrics
+
+p messages received from the client: #{messages_received}
+p messages sent to the client: #{messages_sent}
+p bytes read counter: #{memory(read_counter)}
+p bytes written counter: #{memory(write_counter)}
+p last read size: #{memory(last_read_size)}
+p last write size: #{memory(last_write_size)}